Эх сурвалжийг харах

mq修改和增加分布式quartz

wangliang 5 жил өмнө
parent
commit
91f1f058d9
17 өөрчлөгдсөн 850 нэмэгдсэн , 133 устгасан
  1. 6 0
      pom.xml
  2. 43 8
      themis-backend/src/main/java/com/qmth/themis/backend/api/TBUserController.java
  3. 22 0
      themis-backend/src/main/java/com/qmth/themis/backend/config/DictionaryConfig.java
  4. 61 52
      themis-backend/src/main/java/com/qmth/themis/backend/interceptor/AuthInterceptor.java
  5. 45 0
      themis-backend/src/main/java/com/qmth/themis/backend/quartz/MqJob.java
  6. 58 12
      themis-backend/src/main/resources/application.properties
  7. 4 0
      themis-business/pom.xml
  8. 3 1
      themis-business/src/main/java/com/qmth/themis/business/constant/SystemConstant.java
  9. 57 0
      themis-business/src/main/java/com/qmth/themis/business/domain/MqConfigDomain.java
  10. 30 0
      themis-business/src/main/java/com/qmth/themis/business/domain/QuartzConfigDomain.java
  11. 1 1
      themis-business/src/main/java/com/qmth/themis/business/entity/TBSession.java
  12. 59 21
      themis-business/src/main/java/com/qmth/themis/business/listener/RocketConsumer.java
  13. 95 0
      themis-business/src/main/java/com/qmth/themis/business/service/QuartzService.java
  14. 8 8
      themis-business/src/main/java/com/qmth/themis/business/service/impl/ProducerServerImpl.java
  15. 251 0
      themis-business/src/main/java/com/qmth/themis/business/service/impl/QuartzServiceImpl.java
  16. 92 21
      themis-business/src/main/java/com/qmth/themis/business/util/RedisUtil.java
  17. 15 9
      themis-exam/src/main/resources/application.properties

+ 6 - 0
pom.xml

@@ -189,6 +189,12 @@
                 <artifactId>spring-boot-starter-websocket</artifactId>
                 <version>${spring-boot.version}</version>
             </dependency>
+            <!--quartz-->
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-starter-quartz</artifactId>
+                <version>${spring-boot.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

+ 43 - 8
themis-backend/src/main/java/com/qmth/themis/backend/api/TBUserController.java

@@ -2,6 +2,8 @@ package com.qmth.themis.backend.api;
 
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.qmth.themis.backend.config.DictionaryConfig;
+import com.qmth.themis.backend.quartz.MqJob;
 import com.qmth.themis.backend.util.ServletUtil;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.dto.AuthDto;
@@ -9,6 +11,7 @@ import com.qmth.themis.business.entity.TBSession;
 import com.qmth.themis.business.entity.TBUser;
 import com.qmth.themis.business.service.EhcacheService;
 import com.qmth.themis.business.service.ProducerServer;
+import com.qmth.themis.business.service.QuartzService;
 import com.qmth.themis.business.service.TBUserService;
 import com.qmth.themis.business.util.EhcacheUtil;
 import com.qmth.themis.business.util.RedisUtil;
@@ -25,6 +28,7 @@ import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
 import io.swagger.annotations.*;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.assertj.core.util.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -64,6 +68,15 @@ public class TBUserController {
     @Resource
     ProducerServer producerServer;
 
+    @Resource
+    QuartzService quartzService;
+
+    @Resource
+    DictionaryConfig dictionaryConfig;
+
+    @Resource
+    RedisUtil redisUtil;
+
     @ApiOperation(value = "用户登录接口")
     @RequestMapping(value = "/login/account", method = RequestMethod.POST)
     @ApiResponses({@ApiResponse(code = 200, message = "用户信息", response = TBUser.class)})
@@ -101,16 +114,21 @@ public class TBUserController {
         //生成token
         String token = RandomStringUtils.randomAlphanumeric(32);
         //添加用户缓存
-        RedisUtil.setUser(user.getId(), user);
+        redisUtil.setUser(user.getId(), user);
         //添加用户会话缓存
         String sessionId = SessionUtil.digest(user.getId(), authDto.getRoleEnum().name(), platform.getSource());
 
         Date expire = SystemConstant.getExpireTime(platform);
-        TBSession tbSession = new TBSession(sessionId, String.valueOf(user.getId()), authDto.getRoleEnum().name(), platform.name(), platform.getSource(), deviceId, request.getLocalAddr(), token, expire);
-        RedisUtil.setUserSession(sessionId, tbSession);
+        TBSession tbSession = new TBSession(sessionId, String.valueOf(user.getId()), authDto.getRoleEnum().name(), platform.getSource(), platform.name(), deviceId, request.getLocalAddr(), token, expire);
+        redisUtil.setUserSession(sessionId, tbSession);
         //往mq发送消息插入会话信息
-        producerServer.sendOneWay(SystemConstant.SESSION_TOPIC, platform.getSource(), tbSession);
-
+        try {
+            producerServer.sendOneWay(SystemConstant.SESSION_TOPIC, platform.getSource(), tbSession.getId());
+            tbSession.setAck(SystemConstant.DELIVERED_ACK_TYPE);
+            redisUtil.setSessionTopicList(sessionId, tbSession);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
         //测试
         String test = SignatureInfo.build(SignatureType.TOKEN, sessionId, token);
         Map<String, Object> map = new HashMap<>();
@@ -124,6 +142,23 @@ public class TBUserController {
     @ApiOperation(value = "用户查询接口")
     @RequestMapping(value = "/list", method = RequestMethod.GET)
     public Result list() {
+        Map<String, Object> map = new HashMap<>();
+        map.put("name", MqJob.class.getName());
+        quartzService.deleteJob(dictionaryConfig.quartzConfigDomain().getJobName(), dictionaryConfig.quartzConfigDomain().getJobGroupName());
+        quartzService.addJob(MqJob.class, dictionaryConfig.quartzConfigDomain().getJobName(), dictionaryConfig.quartzConfigDomain().getJobGroupName(), "0 0/1 * * * ?", map);
+
+//        HashMap<String, Object> map = new HashMap<>();
+//        map.put("name", 1);
+//        quartzService.deleteJob("job", "test");
+//        quartzService.addJob(Job.class, "job", "test", "0 * * * * ?", map);
+//
+//        map.put("name", 2);
+//        quartzService.deleteJob("job2", "test");
+//        quartzService.addJob(Job.class, "job2", "test", "10 * * * * ?", map);
+//
+//        map.put("name", 3);
+//        quartzService.deleteJob("job3", "test2");
+//        quartzService.addJob(Job.class, "job3", "test2", "15 * * * * ?", map);
         return ResultUtil.ok(SystemConstant.SUCCESS);
     }
 
@@ -137,18 +172,18 @@ public class TBUserController {
         if (Objects.isNull(tbSession)) {
             throw new BusinessException(ExceptionResultEnum.LOGIN_NO);
         }
-        RedisUtil.deleteUserSession(tbSession.getId());
+        redisUtil.deleteUserSession(tbSession.getId());
         //循环检查该用户下其他平台是否存在session,不存在则删除用户缓存和鉴权缓存
         boolean delete = true;
         for (Source s : Source.values()) {
             String sessionId = SessionUtil.digest(tbUser.getId(), authDto.getRoleEnum().name(), s.name());
-            if (Objects.nonNull(RedisUtil.getUserSession(sessionId))) {
+            if (Objects.nonNull(redisUtil.getUserSession(sessionId))) {
                 delete = false;
                 break;
             }
         }
         if (delete) {
-            RedisUtil.deleteUser(tbUser.getId());
+            redisUtil.deleteUser(tbUser.getId());
             ehcacheService.removeAccountCache(tbUser.getId());
         }
         return ResultUtil.ok(JSONObject.parseObject(SystemConstant.SUCCESS));

+ 22 - 0
themis-backend/src/main/java/com/qmth/themis/backend/config/DictionaryConfig.java

@@ -70,4 +70,26 @@ public class DictionaryConfig {
         return new AliYunOssDomain();
     }
 
+    /**
+     * mq配置
+     *
+     * @return
+     */
+    @Bean
+    @ConfigurationProperties(prefix = "mq.config", ignoreUnknownFields = false)
+    public MqConfigDomain mqConfigDomain() {
+        return new MqConfigDomain();
+    }
+
+
+    /**
+     * quartz配置
+     *
+     * @return
+     */
+    @Bean
+    @ConfigurationProperties(prefix = "quartz.config", ignoreUnknownFields = false)
+    public QuartzConfigDomain quartzConfigDomain() {
+        return new QuartzConfigDomain();
+    }
 }

+ 61 - 52
themis-backend/src/main/java/com/qmth/themis/backend/interceptor/AuthInterceptor.java

@@ -1,5 +1,6 @@
 package com.qmth.themis.backend.interceptor;
 
+import cn.hutool.http.HttpStatus;
 import com.qmth.themis.backend.config.DictionaryConfig;
 import com.qmth.themis.backend.util.ServletUtil;
 import com.qmth.themis.business.constant.SystemConstant;
@@ -48,6 +49,9 @@ public class AuthInterceptor implements HandlerInterceptor {
     @Resource
     TBUserService tbUserService;
 
+    @Resource
+    RedisUtil redisUtil;
+
     @Override
     public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object object) throws Exception {
         log.info("HandlerInterceptor preHandle is come in");
@@ -61,72 +65,77 @@ public class AuthInterceptor implements HandlerInterceptor {
         if (Objects.isNull(deviceId) || Objects.equals(deviceId, "")) {
             throw new BusinessException(ExceptionResultEnum.DEVICE_ID_INVALID);
         }
-        if (url.equalsIgnoreCase("/error")) {
-            if (response.getStatus() == cn.hutool.http.HttpStatus.HTTP_NOT_FOUND) {
+        if (url.equalsIgnoreCase(SystemConstant.ERROR)) {
+            if (response.getStatus() == HttpStatus.HTTP_NOT_FOUND) {
                 throw new BusinessException(ExceptionResultEnum.NOT_FOUND);
+            } else if (response.getStatus() == HttpStatus.HTTP_INTERNAL_ERROR) {
+                throw new BusinessException(ExceptionResultEnum.SERVICE_NOT_FOUND);
             } else {
                 throw new BusinessException(ExceptionResultEnum.EXCEPTION_ERROR);
             }
         }
         Long userId = null;
-        Long timestamp = Long.parseLong(ServletUtil.getRequestTime(request));
-        if (!expire(timestamp.longValue())) {
-            final SignatureInfo info = SignatureInfo
-                    .parse(method, url, timestamp, ServletUtil.getRequestAuthorization(request));
-            if (Objects.nonNull(info) && info.getType() == SignatureType.TOKEN) {
-                String sessionId = info.getInvoker();
-                TBSession tbSession = (TBSession) RedisUtil.getUserSession(sessionId);
-                if (Objects.isNull(tbSession)) {
-                    throw new BusinessException(ExceptionResultEnum.LOGIN_NO);
-                } else {
-                    if (info.validate(tbSession.getAccessToken()) && info.getTimestamp() < tbSession.getExpireTime().getTime()
-                            && platform.name().equalsIgnoreCase(tbSession.getPlatform()) && Objects.equals(deviceId, tbSession.getDeviceId())) {
-                        userId = Long.parseLong(tbSession.getIdentity());
-                        Date expireTime = tbSession.getExpireTime();
-                        //手机端的token时长为一个月,所以会出现缓存没有的情况
-                        TBUser tbUser = (TBUser) RedisUtil.getUser(userId);
-                        if (Objects.isNull(tbUser)) {
-                            tbUser = tbUserService.getById(userId);
-                            RedisUtil.setUser(tbUser.getId(), tbUser);
-                        }
-                        //还剩5分钟刷新会话缓存
-                        if (Objects.nonNull(expireTime) && (expireTime.getTime() - System.currentTimeMillis()) <= SystemConstant.REFRESH_EXPIRE_TIME) {
-                            RedisUtil.refreshUserSession(sessionId);
-                        }
+//        Long timestamp = Long.parseLong(ServletUtil.getRequestTime(request));
+//        if (!expire(timestamp.longValue())) {
+//            final SignatureInfo info = SignatureInfo
+//                    .parse(method, url, timestamp, ServletUtil.getRequestAuthorization(request));
+        //测试
+        final SignatureInfo info = SignatureInfo
+                .parse(ServletUtil.getRequestAuthorization(request));
+        if (Objects.nonNull(info) && info.getType() == SignatureType.TOKEN) {
+            String sessionId = info.getInvoker();
+            TBSession tbSession = (TBSession) redisUtil.getUserSession(sessionId);
+            if (Objects.isNull(tbSession)) {
+                throw new BusinessException(ExceptionResultEnum.LOGIN_NO);
+            } else {
+                if (info.validate(tbSession.getAccessToken()) && info.getTimestamp() < tbSession.getExpireTime().getTime()
+                        && platform.name().equalsIgnoreCase(tbSession.getPlatform()) && Objects.equals(deviceId, tbSession.getDeviceId())) {
+                    userId = Long.parseLong(tbSession.getIdentity());
+                    Date expireTime = tbSession.getExpireTime();
+                    //手机端的token时长为一个月,所以会出现缓存没有的情况
+                    TBUser tbUser = (TBUser) redisUtil.getUser(userId);
+                    if (Objects.isNull(tbUser)) {
+                        tbUser = tbUserService.getById(userId);
+                        redisUtil.setUser(tbUser.getId(), tbUser);
+                    }
+                    //还剩5分钟刷新会话缓存
+                    if (Objects.nonNull(expireTime) && (expireTime.getTime() - System.currentTimeMillis()) <= SystemConstant.REFRESH_EXPIRE_TIME) {
+                        redisUtil.refreshUserSession(sessionId);
+                    }
 
-                        request.setAttribute(SystemConstant.SESSION, tbSession);
-                        request.setAttribute(SystemConstant.ACCOUNT, tbUser);
-                        //系统公用接口不拦截
-                        List<String> sysUrls = dictionaryConfig.systemUrlDomain().getUrls();
-                        int sysCount = (int) sysUrls.stream().filter(s -> {
-                            return s.equalsIgnoreCase(url);
-                        }).count();
-                        if (sysCount > 0) {
-                            return true;
-                        }
+                    request.setAttribute(SystemConstant.SESSION, tbSession);
+                    request.setAttribute(SystemConstant.ACCOUNT, tbUser);
+                    //系统公用接口不拦截
+                    List<String> sysUrls = dictionaryConfig.systemUrlDomain().getUrls();
+                    int sysCount = (int) sysUrls.stream().filter(s -> {
+                        return s.equalsIgnoreCase(url);
+                    }).count();
+                    if (sysCount > 0) {
+                        return true;
+                    }
 
-                        //验证权限
-                        AuthDto authDto = (AuthDto) EhcacheUtil.get(SystemConstant.AUTH_CACHE, userId);
-                        if (Objects.isNull(authDto)) {
-                            authDto = ehcacheService.addAccountCache(userId);
-                        }
-                        Set<String> urls = authDto.getUrls();
-                        int count = (int) urls.stream().filter(s -> {
-                            return s.equalsIgnoreCase(url);
-                        }).count();
-                        if (count == 0) {
-                            throw new BusinessException(ExceptionResultEnum.UN_AUTHORIZATION);
-                        }
-                    } else {
-                        throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
+                    //验证权限
+                    AuthDto authDto = (AuthDto) EhcacheUtil.get(SystemConstant.AUTH_CACHE, userId);
+                    if (Objects.isNull(authDto)) {
+                        authDto = ehcacheService.addAccountCache(userId);
+                    }
+                    Set<String> urls = authDto.getUrls();
+                    int count = (int) urls.stream().filter(s -> {
+                        return s.equalsIgnoreCase(url);
+                    }).count();
+                    if (count == 0) {
+                        throw new BusinessException(ExceptionResultEnum.UN_AUTHORIZATION);
                     }
+                } else {
+                    throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
                 }
-            } else {
-                throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
             }
         } else {
             throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
         }
+//        } else {
+//            throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
+//        }
         return true;
     }
 

+ 45 - 0
themis-backend/src/main/java/com/qmth/themis/backend/quartz/MqJob.java

@@ -0,0 +1,45 @@
+package com.qmth.themis.backend.quartz;
+
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.entity.TBSession;
+import com.qmth.themis.business.service.ProducerServer;
+import com.qmth.themis.business.util.RedisUtil;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.quartz.QuartzJobBean;
+
+import javax.annotation.Resource;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * @Description: mq job
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/1
+ */
+public class MqJob extends QuartzJobBean {
+    private final static Logger log = LoggerFactory.getLogger(MqJob.class);
+
+    @Resource
+    RedisUtil redisUtil;
+
+    @Resource
+    ProducerServer producerServer;
+
+    @Override
+    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
+        Long size = redisUtil.getHashSize(SystemConstant.SESSION_TOPIC_ERROR_LIST);
+        if (Objects.nonNull(size) && size.longValue() > 0) {
+            log.info("session_topic有异常的消息数为:{}", size);
+            Map map = redisUtil.getHashEntries(SystemConstant.SESSION_TOPIC_ERROR_LIST);
+            map.forEach((k, v) -> {
+                TBSession tbSession = (TBSession) v;
+                producerServer.sendOneWay(SystemConstant.SESSION_TOPIC, tbSession.getSource(), v);
+            });
+        }
+    }
+}

+ 58 - 12
themis-backend/src/main/resources/application.properties

@@ -94,16 +94,6 @@ spring.redis.jedis.timeout=180000
 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
 spring.jackson.time-zone=GMT+8
 
-#rocketmq\u914D\u7F6E
-#rocketmq.name-server=127.0.0.1:9876
-#rocketmq.producer.send-message-timeout=300000
-#rocketmq.producer.group=my-group
-#rocketmq.producer.compress-message-body-threshold=4096
-#rocketmq.producer.max-message-size=4194304
-#rocketmq.producer.retry-times-when-send-async-failed=0
-#rocketmq.producer.retry-next-server=true
-#rocketmq.producer.retry-times-when-send-failed=2
-
 #\u963F\u91CC\u4E91OSS\u914D\u7F6E
 aliyun.oss.name=oss-cn-shenzhen.aliyuncs.com
 aliyun.oss.endpoint=http://${aliyun.oss.name}
@@ -125,22 +115,78 @@ sys.config.oss=false
 #sys.config.serverHost=localhost:7001
 #spring.resources.static-locations=file:${sys.config.serverUpload},classpath:/META-INF/resources/,classpath:/resources/
 
+#============================================================================
+# \u914D\u7F6Erocketmq
+#============================================================================
+#namesrv\u5730\u5740
 rocketmq.name-server=127.0.0.1:9876
+#\u53D1\u9001\u6D88\u606F\u8D85\u65F6\u65F6\u95F4\uFF0C\u5355\u4F4D\u6BEB\u79D2\u3002\u9ED8\u8BA410000
 rocketmq.producer.send-message-timeout=300000
+#Producer\u7EC4\u540D\uFF0C\u591A\u4E2AProducer\u5982\u679C\u5C5E\u4E8E\u4E00\u4E2A\u5E94\u7528\uFF0C\u53D1\u9001\u540C\u6837\u7684\u6D88\u606F\uFF0C\u5219\u5E94\u8BE5\u5C06\u5B83\u4EEC\u5F52\u4E3A\u540C\u4E00\u7EC4\u3002\u9ED8\u8BA4DEFAULT_PRODUCER
 rocketmq.producer.group=my-group
+#\u5BA2\u6237\u7AEF\u9650\u5236\u7684\u6D88\u606F\u5927\u5C0F\uFF0C\u8D85\u8FC7\u62A5\u9519\uFF0C\u540C\u65F6\u670D\u52A1\u7AEF\u4E5F\u4F1A\u9650\u5236\uFF0C\u9700\u8981\u8DDF\u670D\u52A1\u7AEF\u914D\u5408\u4F7F\u7528\u3002\u9ED8\u8BA44MB
 rocketmq.producer.compress-message-body-threshold=4096
 rocketmq.producer.max-message-size=4194304
-rocketmq.producer.retry-times-when-send-async-failed=0
+#\u5982\u679C\u6D88\u606F\u53D1\u9001\u5931\u8D25\uFF0C\u6700\u5927\u91CD\u8BD5\u6B21\u6570\uFF0C\u8BE5\u53C2\u6570\u53EA\u5BF9\u5F02\u6B65\u53D1\u9001\u6A21\u5F0F\u8D77\u4F5C\u7528\u3002\u9ED8\u8BA42
+rocketmq.producer.retry-times-when-send-async-failed=3
+#\u5982\u679C\u6D88\u606F\u53D1\u9001\u5931\u8D25\uFF0C\u662F\u5426\u7EE7\u7EED\u53D1\u4E0B\u4E00\u6761
 rocketmq.producer.retry-next-server=true
+#\u5982\u679C\u6D88\u606F\u53D1\u9001\u5931\u8D25\uFF0C\u6700\u5927\u91CD\u8BD5\u6B21\u6570\uFF0C\u8BE5\u53C2\u6570\u53EA\u5BF9\u540C\u6B65\u53D1\u9001\u6A21\u5F0F\u8D77\u4F5C\u7528\u3002\u9ED8\u8BA42
 rocketmq.producer.retry-times-when-send-failed=3
+#ACK
 rocketmq.producer.access-key=AK
 rocketmq.producer.secret-key=SK
 rocketmq.producer.enable-msg-trace=true
 rocketmq.producer.customized-trace-topic=my-trace-topic
 
+mq.config.sessionTopic=themis-session-topic
+mq.config.sessionTopicTag=web||wxapp||pc
+mq.config.sessionConsumerGroup=themis-group
+#dlq\u6B7B\u4FE1\u961F\u5217
+mq.config.sessionConsumerGroupDlq=${mq.config.sessionConsumerGroup}-dlq
+mq.config.sessionTopicDlq=%DLQ%${mq.config.sessionConsumerGroup}
+#============================================================================
+# \u914D\u7F6EJobStore
+#============================================================================
+spring.quartz.job-store-type=jdbc
+spring.quartz.jdbc.initialize-schema=never
+# JobDataMaps\u662F\u5426\u90FD\u4E3AString\u7C7B\u578B\uFF0C\u9ED8\u8BA4false
+spring.quartz.properties.org.quartz.jobStore.useProperties=false
+# \u8868\u7684\u524D\u7F00\uFF0C\u9ED8\u8BA4QRTZ_
+spring.quartz.properties.org.quartz.jobStore.tablePrefix=QRTZ_
+# \u662F\u5426\u52A0\u5165\u96C6\u7FA4
+spring.quartz.properties.org.quartz.jobStore.isClustered=true
+# \u8C03\u5EA6\u5B9E\u4F8B\u5931\u6548\u7684\u68C0\u67E5\u65F6\u95F4\u95F4\u9694 ms
+spring.quartz.properties.org.quartz.jobStore.clusterCheckinInterval=5000
+# \u5F53\u8BBE\u7F6E\u4E3A\u201Ctrue\u201D\u65F6\uFF0C\u6B64\u5C5E\u6027\u544A\u8BC9Quartz \u5728\u975E\u6258\u7BA1JDBC\u8FDE\u63A5\u4E0A\u8C03\u7528setTransactionIsolation\uFF08Connection.TRANSACTION_READ_COMMITTED\uFF09\u3002
+spring.quartz.properties.org.quartz.jobStore.txIsolationLevelReadCommitted=true
+# \u6570\u636E\u4FDD\u5B58\u65B9\u5F0F\u4E3A\u6570\u636E\u5E93\u6301\u4E45\u5316
+spring.quartz.properties.org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
+# \u6570\u636E\u5E93\u4EE3\u7406\u7C7B\uFF0C\u4E00\u822Corg.quartz.impl.jdbcjobstore.StdJDBCDelegate\u53EF\u4EE5\u6EE1\u8DB3\u5927\u90E8\u5206\u6570\u636E\u5E93
+spring.quartz.properties.org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
+#============================================================================
+# Scheduler \u8C03\u5EA6\u5668\u5C5E\u6027\u914D\u7F6E
+#============================================================================
+# \u8C03\u5EA6\u6807\u8BC6\u540D \u96C6\u7FA4\u4E2D\u6BCF\u4E00\u4E2A\u5B9E\u4F8B\u90FD\u5FC5\u987B\u4F7F\u7528\u76F8\u540C\u7684\u540D\u79F0
+spring.quartz.properties.org.quartz.scheduler.instanceName=ClusterQuartz
+# ID\u8BBE\u7F6E\u4E3A\u81EA\u52A8\u83B7\u53D6 \u6BCF\u4E00\u4E2A\u5FC5\u987B\u4E0D\u540C
+spring.quartz.properties.org.quartz.scheduler.instanceId=AUTO
+
+#============================================================================
+# \u914D\u7F6EThreadPool
+#============================================================================
+# \u7EBF\u7A0B\u6C60\u7684\u5B9E\u73B0\u7C7B\uFF08\u4E00\u822C\u4F7F\u7528SimpleThreadPool\u5373\u53EF\u6EE1\u8DB3\u51E0\u4E4E\u6240\u6709\u7528\u6237\u7684\u9700\u6C42\uFF09
+spring.quartz.properties.org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
+# \u6307\u5B9A\u7EBF\u7A0B\u6570\uFF0C\u4E00\u822C\u8BBE\u7F6E\u4E3A1-100\u76F4\u63A5\u7684\u6574\u6570\uFF0C\u6839\u636E\u7CFB\u7EDF\u8D44\u6E90\u914D\u7F6E
+spring.quartz.properties.org.quartz.threadPool.threadCount=10
+# \u8BBE\u7F6E\u7EBF\u7A0B\u7684\u4F18\u5148\u7EA7(\u53EF\u4EE5\u662FThread.MIN_PRIORITY\uFF08\u53731\uFF09\u548CThread.MAX_PRIORITY\uFF08\u8FD9\u662F10\uFF09\u4E4B\u95F4\u7684\u4EFB\u4F55int \u3002\u9ED8\u8BA4\u503C\u4E3AThread.NORM_PRIORITY\uFF085\uFF09\u3002)
+spring.quartz.properties.org.quartz.threadPool.threadPriority=5
+quartz.config.jobName=backendJob
+quartz.config.jobGroupName=backendGroupName
+
 #api\u524D\u7F00
 prefix.url.admin=api/admin
 
 #\u65E0\u9700\u9274\u6743\u7684url
 no.auth.urls=/webjars/**,/druid/**,/swagger-ui.html,/doc.html,/swagger-resources/**,/v2/api-docs,/webjars/springfox-swagger-ui/**,/api/admin/user/login/account
-common.system.urls=/api/admin/sys/getMenu,/api/admin/user/logout,/api/admin/sys/env
+common.system.urls=/api/admin/sys/getMenu,/api/admin/user/logout,/api/admin/sys/env,/api/admin/user/list

+ 4 - 0
themis-business/pom.xml

@@ -91,5 +91,9 @@
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-spring-boot-starter</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-quartz</artifactId>
+        </dependency>
     </dependencies>
 </project>

+ 3 - 1
themis-business/src/main/java/com/qmth/themis/business/constant/SystemConstant.java

@@ -40,6 +40,7 @@ public class SystemConstant {
     public static final String ALL_PATH = "/**";
     public static final String ACCOUNT = "account";
     public static final String ERROR = "/error";
+    public static final String SESSION_TOPIC_ERROR_LIST = "session:topic:error:list";
     /**
      * session过期时间
      */
@@ -60,12 +61,13 @@ public class SystemConstant {
     public static final int MAXRECONSUMETIMES = 3;
     public static final String PROPERTIES = "properties";
 
-    public static final int DELIVERED_ACK_TYPE = 0;//消息"已接收",但尚未处理结束
+    public static final int DELIVERED_ACK_TYPE = 0;//消息"已发出",但尚未处理结束
     public static final int POSION_ACK_TYPE = 1;//消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
     public static final int STANDARD_ACK_TYPE = 2;//"标准"类型,通常表示为消息"处理成功",broker端可以删除消息了
     public static final int REDELIVERED_ACK_TYPE = 3;//消息需"重发",比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息
     public static final int INDIVIDUAL_ACK_TYPE = 4;//表示只确认"单条消息",无论在任何ACK_MODE下
     public static final int UNMATCHED_ACK_TYPE = 5;//BROKER间转发消息时,接收端"拒绝"消息
+    public static final int UNSEND_ACK_TYPE = 6;//消息未发出
 
     /**
      * 获取过期时间

+ 57 - 0
themis-business/src/main/java/com/qmth/themis/business/domain/MqConfigDomain.java

@@ -0,0 +1,57 @@
+package com.qmth.themis.business.domain;
+
+/**
+ * @Description: mq配置
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/1
+ */
+public class MqConfigDomain {
+
+    private String sessionTopic;
+    private String sessionTopicTag;
+    private String sessionConsumerGroup;
+    private String sessionConsumerGroupDlq;
+    private String sessionTopicDlq;
+
+    public String getSessionTopic() {
+        return sessionTopic;
+    }
+
+    public void setSessionTopic(String sessionTopic) {
+        this.sessionTopic = sessionTopic;
+    }
+
+    public String getSessionTopicTag() {
+        return sessionTopicTag;
+    }
+
+    public void setSessionTopicTag(String sessionTopicTag) {
+        this.sessionTopicTag = sessionTopicTag;
+    }
+
+    public String getSessionConsumerGroup() {
+        return sessionConsumerGroup;
+    }
+
+    public void setSessionConsumerGroup(String sessionConsumerGroup) {
+        this.sessionConsumerGroup = sessionConsumerGroup;
+    }
+
+    public String getSessionConsumerGroupDlq() {
+        return sessionConsumerGroupDlq;
+    }
+
+    public void setSessionConsumerGroupDlq(String sessionConsumerGroupDlq) {
+        this.sessionConsumerGroupDlq = sessionConsumerGroupDlq;
+    }
+
+    public String getSessionTopicDlq() {
+        return sessionTopicDlq;
+    }
+
+    public void setSessionTopicDlq(String sessionTopicDlq) {
+        this.sessionTopicDlq = sessionTopicDlq;
+    }
+}

+ 30 - 0
themis-business/src/main/java/com/qmth/themis/business/domain/QuartzConfigDomain.java

@@ -0,0 +1,30 @@
+package com.qmth.themis.business.domain;
+
+/**
+ * @Description: quartz配置
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/1
+ */
+public class QuartzConfigDomain {
+
+    private String jobName;
+    private String jobGroupName;
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    public String getJobGroupName() {
+        return jobGroupName;
+    }
+
+    public void setJobGroupName(String jobGroupName) {
+        this.jobGroupName = jobGroupName;
+    }
+}

+ 1 - 1
themis-business/src/main/java/com/qmth/themis/business/entity/TBSession.java

@@ -196,6 +196,6 @@ public class TBSession implements Serializable {
         this.expireTime = expireTime;
         this.lastAccessTime = new Date();
         this.updateTime = new Date();
-        this.ack = SystemConstant.DELIVERED_ACK_TYPE;
+        this.ack = SystemConstant.UNSEND_ACK_TYPE;
     }
 }

+ 59 - 21
themis-business/src/main/java/com/qmth/themis/business/listener/RocketConsumer.java

@@ -23,6 +23,9 @@ import org.springframework.stereotype.Service;
 import javax.annotation.Resource;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+
+import static com.alibaba.fastjson.JSONObject.parseObject;
 
 /**
  * @Description: 普通消息监听
@@ -32,45 +35,80 @@ import java.util.Map;
  * @Date: 2020/6/28
  */
 @Service
-public class RocketConsumer {
+public class RocketConsumer implements MessageListenerConcurrently {
 
     private final static Logger log = LoggerFactory.getLogger(RocketConsumer.class);
 
     @Resource
     TBSessionService tbSessionService;
 
+    @Resource
+    RedisUtil redisUtil;
+
+    @Override
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        try {
+            for (MessageExt messageExt : msgs) {
+                log.info("sessionConsumer重试次数:{}", messageExt.getReconsumeTimes());
+                JSONObject jsonObject = parseObject(new String(messageExt.getBody(), Constants.CHARSET));
+                log.info("sessionConsumer接受到的消息:{}", jsonObject.toJSONString());
+                boolean waitStoreMsgOK = jsonObject.getBoolean("waitStoreMsgOK");
+                if (waitStoreMsgOK) {
+                    Map map = (Map) jsonObject.get(SystemConstant.PROPERTIES);
+                    String sessionId = (String.valueOf(map.get(String.valueOf(map.get("TAGS")))));
+                    if (Objects.nonNull(sessionId)) {
+                        TBSession tbSession = (TBSession) redisUtil.getSessionTopicList(sessionId);
+                        if (Objects.nonNull(tbSession.getAck()) && tbSession.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
+                            tbSessionService.saveSessionInfo(tbSession);
+                            tbSession.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                            redisUtil.setUserSession(tbSession.getId(), tbSession);
+                            redisUtil.deleteSessionTopicList(tbSession.getId());
+                        }
+                    }
+                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+                } else {
+                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+        }
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+    }
+
     @Service
-    @RocketMQMessageListener(consumerGroup = "themis-group", topic = "themis-session-topic", selectorType = SelectorType.TAG, selectorExpression = "web||wxapp||pc")
+    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicTag}")
     public class sessionConsumer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
 
         @Override
         public void onMessage(Message message) {
             //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-            log.info("实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用");
         }
 
         @Override
         public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
             defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
             defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-            defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
-                @Override
-                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
-                    for (MessageExt messageExt : msgs) {
-                        log.info("重试次数:{}", messageExt.getReconsumeTimes());
-                        JSONObject jsonObject = JSONObject.parseObject(new String(messageExt.getBody(), Constants.CHARSET));
-                        log.info("接受到的消息:{}", jsonObject.toJSONString());
-                        Map map = (Map) jsonObject.get(SystemConstant.PROPERTIES);
-                        TBSession tbSession = JSONObject.toJavaObject(JSONObject.parseObject(String.valueOf(map.get(String.valueOf(map.get("TAGS"))))), TBSession.class);
-                        tbSessionService.saveSessionInfo(tbSession);
-                        tbSession.setAck(SystemConstant.STANDARD_ACK_TYPE);
-                        RedisUtil.setUserSession(tbSession.getId(), tbSession);
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                    }
-                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
-//                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-                }
-            });
+            defaultMQPushConsumer.registerMessageListener(RocketConsumer.this::consumeMessage);
+        }
+    }
+
+    @Service
+    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerGroupDlq}", topic = "${mq.config.sessionTopicDlq}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicTag}")
+    public class dlqSessionConsumer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
+
+        @Override
+        public void onMessage(Message message) {
+            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
+        }
+
+        @Override
+        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
+            log.info("dlqSessionConsumer死信队列进来了");
+            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
+            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
+            defaultMQPushConsumer.registerMessageListener(RocketConsumer.this::consumeMessage);
         }
     }
 }

+ 95 - 0
themis-business/src/main/java/com/qmth/themis/business/service/QuartzService.java

@@ -0,0 +1,95 @@
+package com.qmth.themis.business.service;
+
+import org.springframework.scheduling.quartz.QuartzJobBean;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @Description: quartz 服务类
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/1
+ */
+public interface QuartzService {
+
+    /**
+     * 增加一个job
+     *
+     * @param jobClass
+     * @param jobName
+     * @param jobGroupName
+     * @param jobTime
+     * @param jobTimes
+     * @param jobData
+     */
+    void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime,
+                int jobTimes, Map jobData);
+
+    /**
+     * 增加一个job
+     *
+     * @param jobClass
+     * @param jobName
+     * @param jobGroupName
+     * @param jobTime
+     * @param jobData
+     */
+    void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, String jobTime, Map jobData);
+
+    /**
+     * 修改一个job的 时间表达式
+     *
+     * @param jobName
+     * @param jobGroupName
+     * @param jobTime
+     */
+    void updateJob(String jobName, String jobGroupName, String jobTime);
+
+    /**
+     * 删除任务一个job
+     *
+     * @param jobName
+     * @param jobGroupName
+     */
+    void deleteJob(String jobName, String jobGroupName);
+
+    /**
+     * 暂停一个job
+     *
+     * @param jobName
+     * @param jobGroupName
+     */
+    void pauseJob(String jobName, String jobGroupName);
+
+    /**
+     * 恢复一个job
+     *
+     * @param jobName
+     * @param jobGroupName
+     */
+    void resumeJob(String jobName, String jobGroupName);
+
+    /**
+     * 立即执行一个job
+     *
+     * @param jobName
+     * @param jobGroupName
+     */
+    void runAJobNow(String jobName, String jobGroupName);
+
+    /**
+     * 获取所有job
+     *
+     * @return
+     */
+    List<Map<String, Object>> queryAllJob();
+
+    /**
+     * 查询正在运行的job
+     *
+     * @return
+     */
+    List<Map<String, Object>> queryRunJob();
+}

+ 8 - 8
themis-business/src/main/java/com/qmth/themis/business/service/impl/ProducerServerImpl.java

@@ -45,9 +45,9 @@ public class ProducerServerImpl implements ProducerServer {
         log.info("syncMsg topic:{},tags:{},msg:{}", topic, tags, JSONObject.toJSONString(msg));
         Message message = new Message();
         message.setTopic(topic);
-        message.setBody(JSONObject.toJSONString(msg).getBytes());
+        message.setBody(String.valueOf(msg).getBytes());
         message.setTags(tags);
-        message.putUserProperty(tags, JSONObject.toJSONString(msg));
+        message.putUserProperty(tags, String.valueOf(msg));
         SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
         // 同步消息发送成功会有一个返回值,我们可以用这个返回值进行判断和获取一些信息
         log.info("sendResult:{}", JSONObject.toJSONString(sendResult));
@@ -67,9 +67,9 @@ public class ProducerServerImpl implements ProducerServer {
         log.info("asyncMsg topic:{},tags:{},msg:{}", topic, tags, JSONObject.toJSONString(msg));
         Message message = new Message();
         message.setTopic(topic);
-        message.setBody(JSONObject.toJSONString(msg).getBytes());
+        message.setBody(String.valueOf(msg).getBytes());
         message.setTags(tags);
-        message.putUserProperty(tags, JSONObject.toJSONString(msg));
+        message.putUserProperty(tags, String.valueOf(msg));
         rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
@@ -99,9 +99,9 @@ public class ProducerServerImpl implements ProducerServer {
         log.info("sendOneWay topic:{},tags:{},msg:{}", topic, tags, JSONObject.toJSONString(msg));
         Message message = new Message();
         message.setTopic(topic);
-        message.setBody(JSONObject.toJSONString(msg).getBytes());
+        message.setBody(String.valueOf(msg).getBytes());
         message.setTags(tags);
-        message.putUserProperty(tags, JSONObject.toJSONString(msg));
+        message.putUserProperty(tags, String.valueOf(msg));
         rocketMQTemplate.sendOneWay(topic + ":" + tags, message);
         return ResultUtil.ok(SystemConstant.SUCCESS);
     }
@@ -119,9 +119,9 @@ public class ProducerServerImpl implements ProducerServer {
         log.info("sendMsgTran topic:{},tags:{},msg:{}", topic, tags, JSONObject.toJSONString(msg));
         Message message = new Message();
         message.setTopic(topic);
-        message.setBody(JSONObject.toJSONString(msg).getBytes());
+        message.setBody(String.valueOf(msg).getBytes());
         message.setTags(tags);
-        message.putUserProperty(tags, JSONObject.toJSONString(msg));
+        message.putUserProperty(tags, String.valueOf(msg));
         org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(message).build();
         //发送事务消息
         TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(topic, messageTran, null);

+ 251 - 0
themis-business/src/main/java/com/qmth/themis/business/service/impl/QuartzServiceImpl.java

@@ -0,0 +1,251 @@
+package com.qmth.themis.business.service.impl;
+
+import com.qmth.themis.business.service.QuartzService;
+import org.quartz.*;
+import org.quartz.impl.matchers.GroupMatcher;
+import org.springframework.scheduling.quartz.QuartzJobBean;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.*;
+
+/**
+ * @Description: quartz 服务实现类
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/1
+ */
+@Service
+public class QuartzServiceImpl implements QuartzService {
+
+    @Resource
+    private Scheduler scheduler;
+
+    /**
+     * 增加一个job
+     *
+     * @param jobClass     任务实现类
+     * @param jobName      任务名称
+     * @param jobGroupName 任务组名
+     * @param jobTime      时间表达式 (这是每隔多少秒为一次任务)
+     * @param jobTimes     运行的次数 (<0:表示不限次数)
+     * @param jobData      参数
+     */
+    @Override
+    public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime,
+                       int jobTimes, Map jobData) {
+        try {
+            // 任务名称和组构成任务key
+            JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
+                    .build();
+            // 设置job参数
+            if (jobData != null && jobData.size() > 0) {
+                jobDetail.getJobDataMap().putAll(jobData);
+            }
+            // 使用simpleTrigger规则
+            Trigger trigger = null;
+            if (jobTimes < 0) {
+                trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
+                        .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime))
+                        .startNow().build();
+            } else {
+                trigger = TriggerBuilder
+                        .newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder
+                                .repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes))
+                        .startNow().build();
+            }
+            scheduler.scheduleJob(jobDetail, trigger);
+        } catch (SchedulerException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 增加一个job
+     *
+     * @param jobClass     任务实现类
+     * @param jobName      任务名称(建议唯一)
+     * @param jobGroupName 任务组名
+     * @param jobTime      时间表达式 (如:0/5 * * * * ? )
+     * @param jobData      参数
+     */
+    @Override
+    public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, String jobTime, Map jobData) {
+        try {
+            // 创建jobDetail实例,绑定Job实现类
+            // 指明job的名称,所在组的名称,以及绑定job类
+            // 任务名称和组构成任务key
+            JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
+                    .build();
+            // 设置job参数
+            if (jobData != null && jobData.size() > 0) {
+                jobDetail.getJobDataMap().putAll(jobData);
+            }
+            // 定义调度触发规则
+            // 使用cornTrigger规则
+            // 触发器key
+            Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
+                    .startAt(DateBuilder.futureDate(1, DateBuilder.IntervalUnit.SECOND))
+                    .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).startNow().build();
+            // 把作业和触发器注册到任务调度中
+            scheduler.scheduleJob(jobDetail, trigger);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 修改一个job的 时间表达式
+     *
+     * @param jobName
+     * @param jobGroupName
+     * @param jobTime
+     */
+    @Override
+    public void updateJob(String jobName, String jobGroupName, String jobTime) {
+        try {
+            TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
+            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
+            trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
+                    .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build();
+            // 重启触发器
+            scheduler.rescheduleJob(triggerKey, trigger);
+        } catch (SchedulerException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 删除任务一个job
+     *
+     * @param jobName      任务名称
+     * @param jobGroupName 任务组名
+     */
+    @Override
+    public void deleteJob(String jobName, String jobGroupName) {
+        try {
+            scheduler.deleteJob(new JobKey(jobName, jobGroupName));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 暂停一个job
+     *
+     * @param jobName
+     * @param jobGroupName
+     */
+    @Override
+    public void pauseJob(String jobName, String jobGroupName) {
+        try {
+            JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
+            scheduler.pauseJob(jobKey);
+        } catch (SchedulerException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 恢复一个job
+     *
+     * @param jobName
+     * @param jobGroupName
+     */
+    @Override
+    public void resumeJob(String jobName, String jobGroupName) {
+        try {
+            JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
+            scheduler.resumeJob(jobKey);
+        } catch (SchedulerException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 立即执行一个job
+     *
+     * @param jobName
+     * @param jobGroupName
+     */
+    @Override
+    public void runAJobNow(String jobName, String jobGroupName) {
+        try {
+            JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
+            scheduler.triggerJob(jobKey);
+        } catch (SchedulerException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 获取所有job
+     *
+     * @return
+     */
+    @Override
+    public List<Map<String, Object>> queryAllJob() {
+        List<Map<String, Object>> jobList = null;
+        try {
+            GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
+            Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
+            jobList = new ArrayList<Map<String, Object>>();
+            for (JobKey jobKey : jobKeys) {
+                List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
+                for (Trigger trigger : triggers) {
+                    Map<String, Object> map = new HashMap<>();
+                    map.put("jobName", jobKey.getName());
+                    map.put("jobGroupName", jobKey.getGroup());
+                    map.put("description", "触发器:" + trigger.getKey());
+                    Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
+                    map.put("jobStatus", triggerState.name());
+                    if (trigger instanceof CronTrigger) {
+                        CronTrigger cronTrigger = (CronTrigger) trigger;
+                        String cronExpression = cronTrigger.getCronExpression();
+                        map.put("jobTime", cronExpression);
+                    }
+                    jobList.add(map);
+                }
+            }
+        } catch (SchedulerException e) {
+            e.printStackTrace();
+        }
+        return jobList;
+    }
+
+    /**
+     * 获取所有正在运行的job
+     *
+     * @return
+     */
+    @Override
+    public List<Map<String, Object>> queryRunJob() {
+        List<Map<String, Object>> jobList = null;
+        try {
+            List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
+            jobList = new ArrayList<Map<String, Object>>(executingJobs.size());
+            for (JobExecutionContext executingJob : executingJobs) {
+                Map<String, Object> map = new HashMap<String, Object>();
+                JobDetail jobDetail = executingJob.getJobDetail();
+                JobKey jobKey = jobDetail.getKey();
+                Trigger trigger = executingJob.getTrigger();
+                map.put("jobName", jobKey.getName());
+                map.put("jobGroupName", jobKey.getGroup());
+                map.put("description", "触发器:" + trigger.getKey());
+                Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
+                map.put("jobStatus", triggerState.name());
+                if (trigger instanceof CronTrigger) {
+                    CronTrigger cronTrigger = (CronTrigger) trigger;
+                    String cronExpression = cronTrigger.getCronExpression();
+                    map.put("jobTime", cronExpression);
+                }
+                jobList.add(map);
+            }
+        } catch (SchedulerException e) {
+            e.printStackTrace();
+        }
+        return jobList;
+    }
+}
+

+ 92 - 21
themis-business/src/main/java/com/qmth/themis/business/util/RedisUtil.java

@@ -1,9 +1,13 @@
 package com.qmth.themis.business.util;
 
-import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -13,16 +17,19 @@ import java.util.concurrent.TimeUnit;
  * @Author: wangliang
  * @Date: 2020/4/15
  */
+@Component
 public class RedisUtil {
 
+    @Resource
+    RedisTemplate redisTemplate;
+
     /**
      * 获取用户信息
      *
      * @param userId
      * @return
      */
-    public static Object getUser(Long userId) {
-        RedisTemplate redisTemplate = SpringContextHolder.getBean(RedisTemplate.class);
+    public Object getUser(Long userId) {
         return redisTemplate.opsForValue().get(SystemConstant.USER + userId);
     }
 
@@ -32,8 +39,7 @@ public class RedisUtil {
      * @param sessionId
      * @return
      */
-    public static Object getUserSession(String sessionId) {
-        RedisTemplate redisTemplate = SpringContextHolder.getBean(RedisTemplate.class);
+    public Object getUserSession(String sessionId) {
         return redisTemplate.opsForValue().get(SystemConstant.SESSION + sessionId);
     }
 
@@ -42,8 +48,7 @@ public class RedisUtil {
      *
      * @param userId
      */
-    public static void deleteUser(Long userId) {
-        RedisTemplate redisTemplate = SpringContextHolder.getBean(RedisTemplate.class);
+    public void deleteUser(Long userId) {
         redisTemplate.delete(SystemConstant.USER + userId);
     }
 
@@ -52,8 +57,7 @@ public class RedisUtil {
      *
      * @param sessionId
      */
-    public static void deleteUserSession(String sessionId) {
-        RedisTemplate redisTemplate = SpringContextHolder.getBean(RedisTemplate.class);
+    public void deleteUserSession(String sessionId) {
         redisTemplate.delete(SystemConstant.SESSION + sessionId);
     }
 
@@ -63,8 +67,7 @@ public class RedisUtil {
      * @param userId
      * @param o
      */
-    public static void setUser(Long userId, Object o) {
-        RedisTemplate redisTemplate = SpringContextHolder.getBean(RedisTemplate.class);
+    public void setUser(Long userId, Object o) {
         redisTemplate.opsForValue().set(SystemConstant.USER + userId, o, SystemConstant.REDIS_EXPIRE_TIME, TimeUnit.SECONDS);
     }
 
@@ -73,8 +76,7 @@ public class RedisUtil {
      *
      * @param userId
      */
-    public static void refreshUser(Long userId) {
-        RedisTemplate redisTemplate = SpringContextHolder.getBean(RedisTemplate.class);
+    public void refreshUser(Long userId) {
         redisTemplate.expire(SystemConstant.USER + userId, SystemConstant.REDIS_REFRESH_EXPIRE_TIME, TimeUnit.SECONDS);
     }
 
@@ -84,8 +86,7 @@ public class RedisUtil {
      * @param userId
      * @return
      */
-    public static Long getUserExpire(Long userId) {
-        RedisTemplate redisTemplate = SpringContextHolder.getBean(RedisTemplate.class);
+    public Long getUserExpire(Long userId) {
         return redisTemplate.getExpire(SystemConstant.USER + userId, TimeUnit.SECONDS);
     }
 
@@ -95,8 +96,7 @@ public class RedisUtil {
      * @param sessionId
      * @param o
      */
-    public static void setUserSession(String sessionId, Object o) {
-        RedisTemplate redisTemplate = SpringContextHolder.getBean(RedisTemplate.class);
+    public void setUserSession(String sessionId, Object o) {
         redisTemplate.opsForValue().set(SystemConstant.SESSION + sessionId, o, SystemConstant.REDIS_EXPIRE_TIME, TimeUnit.SECONDS);
     }
 
@@ -105,8 +105,7 @@ public class RedisUtil {
      *
      * @param sessionId
      */
-    public static void refreshUserSession(String sessionId) {
-        RedisTemplate redisTemplate = SpringContextHolder.getBean(RedisTemplate.class);
+    public void refreshUserSession(String sessionId) {
         redisTemplate.expire(SystemConstant.SESSION + sessionId, SystemConstant.REDIS_REFRESH_EXPIRE_TIME, TimeUnit.SECONDS);
     }
 
@@ -116,8 +115,80 @@ public class RedisUtil {
      * @param sessionId
      * @return
      */
-    public static Long getUserSessionExpire(String sessionId) {
-        RedisTemplate redisTemplate = SpringContextHolder.getBean(RedisTemplate.class);
+    public Long getUserSessionExpire(String sessionId) {
         return redisTemplate.getExpire(SystemConstant.SESSION + sessionId, TimeUnit.SECONDS);
     }
+
+    /**
+     * 批量获取key的value
+     *
+     * @param keys
+     * @return
+     */
+    public List<?> multiGet(Set keys) {
+        return redisTemplate.opsForValue().multiGet(keys);
+    }
+
+    /**
+     * 获取key like
+     *
+     * @param key
+     * @return
+     */
+    public Set getKeyPatterns(String key) {
+        if (null != key) {
+            return redisTemplate.keys(key + "*");
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * 设置hash
+     *
+     * @param hashKey
+     * @param hashValue
+     */
+    public void setSessionTopicList(String hashKey, Object hashValue) {
+        redisTemplate.opsForHash().put(SystemConstant.SESSION_TOPIC_ERROR_LIST, hashKey, hashValue);
+    }
+
+    /**
+     * 获取hash
+     *
+     * @param hashKey
+     * @return
+     */
+    public Object getSessionTopicList(String hashKey) {
+        return redisTemplate.opsForHash().get(SystemConstant.SESSION_TOPIC_ERROR_LIST, hashKey);
+    }
+
+    /**
+     * hash删除
+     *
+     * @param hashKey
+     */
+    public void deleteSessionTopicList(String hashKey) {
+        redisTemplate.opsForHash().delete(SystemConstant.SESSION_TOPIC_ERROR_LIST, hashKey);
+    }
+
+    /**
+     * 获取hash大小
+     *
+     * @param key
+     * @return
+     */
+    public Long getHashSize(String key) {
+        return redisTemplate.opsForHash().size(key);
+    }
+
+    /**
+     * 获取hash map
+     *
+     * @param key
+     * @return
+     */
+    public Map getHashEntries(String key) {
+        return redisTemplate.opsForHash().entries(key);
+    }
 }

+ 15 - 9
themis-exam/src/main/resources/application.properties

@@ -94,15 +94,21 @@ spring.redis.jedis.timeout=180000
 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
 spring.jackson.time-zone=GMT+8
 
-#rocketmq\u914D\u7F6E
-#rocketmq.name-server=127.0.0.1:9876
-#rocketmq.producer.send-message-timeout=300000
-#rocketmq.producer.group=my-group
-#rocketmq.producer.compress-message-body-threshold=4096
-#rocketmq.producer.max-message-size=4194304
-#rocketmq.producer.retry-times-when-send-async-failed=0
-#rocketmq.producer.retry-next-server=true
-#rocketmq.producer.retry-times-when-send-failed=2
+#============================================================================
+# \u914D\u7F6Erocketmq
+#============================================================================
+rocketmq.name-server=127.0.0.1:9876
+rocketmq.producer.send-message-timeout=300000
+rocketmq.producer.group=my-group
+rocketmq.producer.compress-message-body-threshold=4096
+rocketmq.producer.max-message-size=4194304
+rocketmq.producer.retry-times-when-send-async-failed=0
+rocketmq.producer.retry-next-server=true
+rocketmq.producer.retry-times-when-send-failed=3
+rocketmq.producer.access-key=AK
+rocketmq.producer.secret-key=SK
+rocketmq.producer.enable-msg-trace=true
+rocketmq.producer.customized-trace-topic=my-trace-topic
 
 #\u963F\u91CC\u4E91OSS\u914D\u7F6E
 aliyun.oss.name=oss-cn-shenzhen.aliyuncs.com