wangliang@qmth.com.cn 4 سال پیش
والد
کامیت
c8bc043c9d

+ 2 - 1
themis-backend/src/main/java/com/qmth/themis/backend/api/TBUserController.java

@@ -31,6 +31,7 @@ import com.qmth.themis.common.signature.SignatureType;
 import com.qmth.themis.common.util.AesUtil;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
+import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.service.MqDtoService;
 import io.swagger.annotations.*;
 import org.apache.commons.lang3.RandomStringUtils;
@@ -157,7 +158,7 @@ public class TBUserController {
         TBSession tbSession = new TBSession(sessionId, String.valueOf(user.getId()), authDto.getRoleCodes().toString(), platform.getSource(), platform.name(), deviceId, ServletUtil.getRequest().getLocalAddr(), token, expire);
         redisUtil.setUserSession(sessionId, tbSession);
         //mq发送消息start
-        mqDtoService.assembleSendOneWayMsg(dictionaryConfig.mqConfigDomain().getSessionTopic(), platform.getSource(), tbSession, MqEnum.SESSION.name(), tbSession.getId(), user.getLoginName());
+        mqDtoService.assembleSendOneWayMsg(MqTopicEnum.sessionTopic.getCode(), platform.getSource(), tbSession, MqEnum.SESSION.name(), tbSession.getId(), user.getLoginName());
         mqDtoService.assembleSendOneWayMsg(dictionaryConfig.mqConfigDomain().getUserLogTopic(), authDto.getRoleCodes().toString().contains(RoleEnum.STUDENT.name()) ? dictionaryConfig.mqConfigDomain().getUserLogTopicStudentTag() : dictionaryConfig.mqConfigDomain().getUserLogTopicUserTag(), SystemOperationEnum.LOGIN, MqEnum.USER_LOG.name(), user.getId(), user.getLoginName());
         //mq发送消息end
         //测试

+ 40 - 0
themis-backend/src/main/java/com/qmth/themis/backend/start/StartRunning.java

@@ -0,0 +1,40 @@
+package com.qmth.themis.backend.start;
+
+import com.qmth.themis.mq.enums.MqGroupEnum;
+import com.qmth.themis.mq.enums.MqTagEnum;
+import com.qmth.themis.mq.enums.MqTopicEnum;
+import com.qmth.themis.mq.listener.RocketMessageConsumer;
+import com.qmth.themis.mq.templete.impl.SessionConcurrentlyImpl;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+/**
+ * @Description: 服务启动时初始化运行,哪个微服务模块需要则拿此模版去用
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/3
+ */
+@Component
+public class StartRunning implements CommandLineRunner {
+    private final static Logger log = LoggerFactory.getLogger(StartRunning.class);
+
+    @Resource
+    RocketMessageConsumer rocketMessageConsumer;
+
+    @Value("rocketmq.name-server")
+    String nameServer;
+
+    @Override
+    public void run(String... args) throws Exception {
+        log.info("服务器启动时执行 start");
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerWebGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.web.name(), MessageModel.CLUSTERING,new SessionConcurrentlyImpl());
+        log.info("服务器启动时执行 end");
+    }
+}

+ 1 - 5
themis-backend/src/main/resources/application.properties

@@ -16,7 +16,7 @@ db.host=localhost
 db.port=3306
 db.name=themis
 db.username=root
-db.password=123456789
+db.password=root
 #redis\u6570\u636E\u6E90\u914D\u7F6E
 redis.host=${db.host}
 redis.database=15
@@ -157,12 +157,8 @@ rocketmq.producer.customized-trace-topic=my-trace-topic
 
 mq.config.server=themis
 #session_topic\u76D1\u542C
-mq.config.sessionTopic=${mq.config.server}-topic-session
 mq.config.sessionConsumerGroup=${mq.config.server}-group-session
 
-mq.config.sessionTopicWebTag=web
-mq.config.sessionConsumerWebGroup=${mq.config.sessionConsumerGroup}-${mq.config.sessionTopicWebTag}
-
 mq.config.sessionTopicWxappMonitorTag=wxapp_monitor
 mq.config.sessionConsumerWxappMonitorGroup=${mq.config.sessionConsumerGroup}-${mq.config.sessionTopicWxappMonitorTag}
 

+ 0 - 19
themis-business/src/main/java/com/qmth/themis/business/domain/MqConfigDomain.java

@@ -12,7 +12,6 @@ import java.io.Serializable;
 public class MqConfigDomain implements Serializable {
 
     private String server;
-    private String sessionTopic;
     private String sessionConsumerGroup;
     //    private String sessionConsumerGroupDlq;
 //    private String sessionTopicDlq;
@@ -20,7 +19,6 @@ public class MqConfigDomain implements Serializable {
      * taskTopicExamStudentTag
      * session topic
      */
-    private String sessionTopicWebTag;
     private String sessionTopicWxappMonitorTag;
     private String sessionTopicWxappAnswerTag;
     private String sessionTopicPcTag;
@@ -229,14 +227,6 @@ public class MqConfigDomain implements Serializable {
         this.sessionConsumerWxappAnswerGroup = sessionConsumerWxappAnswerGroup;
     }
 
-    public String getSessionTopic() {
-        return sessionTopic;
-    }
-
-    public void setSessionTopic(String sessionTopic) {
-        this.sessionTopic = sessionTopic;
-    }
-
     public String getSessionConsumerGroup() {
         return sessionConsumerGroup;
     }
@@ -261,15 +251,6 @@ public class MqConfigDomain implements Serializable {
 //        this.sessionTopicDlq = sessionTopicDlq;
 //    }
 
-    public String getSessionTopicWebTag() {
-        return sessionTopicWebTag;
-    }
-
-    public void setSessionTopicWebTag(String sessionTopicWebTag) {
-        this.sessionTopicWebTag = sessionTopicWebTag;
-    }
-
-
     public String getSessionTopicPcTag() {
         return sessionTopicPcTag;
     }

+ 2 - 1
themis-exam/src/main/java/com/qmth/themis/exam/api/TEStudentController.java

@@ -25,6 +25,7 @@ import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
 import com.qmth.themis.exam.config.DictionaryConfig;
 import com.qmth.themis.exam.util.ServletUtil;
+import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.service.MqDtoService;
 import io.swagger.annotations.*;
 import org.apache.commons.lang3.RandomStringUtils;
@@ -145,7 +146,7 @@ public class TEStudentController {
         TBSession tbSession = new TBSession(sessionId, String.valueOf(teStudent.getId()), authDto.getRoleCodes().toString(), platform.getSource(), platform.name(), deviceId, ServletUtil.getRequest().getLocalAddr(), token, expire);
         redisUtil.setUserSession(sessionId, tbSession);
         //mq发送消息start
-        mqDtoService.assembleSendOneWayMsg(dictionaryConfig.mqConfigDomain().getSessionTopic(), platform.getSource(), tbSession, MqEnum.SESSION.name(), tbSession.getId(), teStudent.getIdentity());
+        mqDtoService.assembleSendOneWayMsg(MqTopicEnum.sessionTopic.getCode(), platform.getSource(), tbSession, MqEnum.SESSION.name(), tbSession.getId(), teStudent.getIdentity());
         mqDtoService.assembleSendOneWayMsg(dictionaryConfig.mqConfigDomain().getUserLogTopic(), authDto.getRoleCodes().toString().contains(RoleEnum.STUDENT.name()) ? dictionaryConfig.mqConfigDomain().getUserLogTopicStudentTag() : dictionaryConfig.mqConfigDomain().getUserLogTopicUserTag(), SystemOperationEnum.LOGIN, MqEnum.EXAM_STUDENT_LOG.name(), teStudent.getId(), teStudent.getIdentity());
         //mq发送消息end
         //测试

+ 1 - 1
themis-exam/src/main/resources/application.properties

@@ -16,7 +16,7 @@ db.host=localhost
 db.port=3306
 db.name=themis
 db.username=root
-db.password=123456789
+db.password=root
 #redis\u6570\u636E\u6E90\u914D\u7F6E
 redis.host=${db.host}
 redis.database=15

+ 18 - 0
themis-mq/src/main/java/com/qmth/themis/mq/enums/MqGroupEnum.java

@@ -0,0 +1,18 @@
+package com.qmth.themis.mq.enums;
+
+public enum MqGroupEnum {
+
+    sessionConsumerWebGroup("themis-group-session-web"),
+
+    sessionConsumerWxappMonitorGroup("themis-group-session-wxapp_monitor");
+
+    private MqGroupEnum(String code){
+        this.code = code;
+    }
+
+    private String code;
+
+    public String getCode() {
+        return code;
+    }
+}

+ 6 - 0
themis-mq/src/main/java/com/qmth/themis/mq/enums/MqTagEnum.java

@@ -0,0 +1,6 @@
+package com.qmth.themis.mq.enums;
+
+public enum MqTagEnum {
+
+    web;
+}

+ 16 - 0
themis-mq/src/main/java/com/qmth/themis/mq/enums/MqTopicEnum.java

@@ -0,0 +1,16 @@
+package com.qmth.themis.mq.enums;
+
+public enum MqTopicEnum {
+
+    sessionTopic("themis-topic-session");
+
+    private MqTopicEnum(String code){
+        this.code = code;
+    }
+
+    private String code;
+
+    public String getCode() {
+        return code;
+    }
+}

+ 18 - 18
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketSessionConsumer.java

@@ -144,24 +144,24 @@ public class RocketSessionConsumer implements
 //        return ConsumeOrderlyStatus.SUCCESS;//成功
 //    }
 
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWebGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWebTag}")
-    public class sessionConsumerWeb implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-//            defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
-            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
-        }
-    }
+//    @Service
+//    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWebGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWebTag}")
+//    public class sessionConsumerWeb implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
+//
+//        @Override
+//        public void onMessage(Message message) {
+//            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
+//        }
+//
+//        @Override
+//        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
+//            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
+//            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+//            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
+////            defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
+//            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
+//        }
+//    }
 
     @Service
     @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerPcGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicPcTag}")

+ 6 - 0
themis-mq/src/main/java/com/qmth/themis/mq/templete/Concurrently.java

@@ -0,0 +1,6 @@
+package com.qmth.themis.mq.templete;
+
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+
+public interface Concurrently extends MessageListenerConcurrently {
+}

+ 4 - 0
themis-mq/src/main/java/com/qmth/themis/mq/templete/Orderly.java

@@ -0,0 +1,4 @@
+package com.qmth.themis.mq.templete;
+
+public interface Orderly {
+}

+ 20 - 0
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/SessionConcurrentlyImpl.java

@@ -0,0 +1,20 @@
+package com.qmth.themis.mq.templete.impl;
+
+import com.qmth.themis.mq.templete.Concurrently;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class SessionConcurrentlyImpl implements Concurrently {
+    private final static Logger log = LoggerFactory.getLogger(SessionConcurrentlyImpl.class);
+
+    @Override
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        log.info("ConsumeConcurrentlyStatus is come in");
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+    }
+}