Parcourir la source

动态注册mq消费者

wangliang il y a 5 ans
Parent
commit
cf52e44351

+ 2 - 2
themis-backend/src/main/java/com/qmth/themis/backend/api/TBUserController.java

@@ -202,8 +202,8 @@ public class TBUserController {
 
 //        for (int i = 0; i < 5; i++) {
         mqDtoService.assembleSendOneWayMsg("imTopic", "teacher", "老师发送的一条消息1", MqEnum.MESSAGE_LOG.name(), "1", "2");
-        mqDtoService.assembleSendOneWayMsg("imTopic", "student", "用户发送的一条消息2", MqEnum.MESSAGE_LOG.name(), "1", "2");
-        mqDtoService.assembleSendOneWayMsg("imTopic", "student", "学生发送的一条消息3", MqEnum.MESSAGE_LOG.name(), "1", "2");
+        mqDtoService.assembleSendOneWayMsg("imTopic", "user", "用户发送的一条消息2", MqEnum.MESSAGE_LOG.name(), "1", "2");
+        mqDtoService.assembleSendOneWayMsg("imTopic", "user", "学生发送的一条消息3", MqEnum.MESSAGE_LOG.name(), "1", "2");
         //        }
         return ResultUtil.ok(SystemConstant.SUCCESS);
     }

+ 6 - 3
themis-backend/src/main/java/com/qmth/themis/backend/start/StartRunning.java

@@ -1,8 +1,8 @@
 package com.qmth.themis.backend.start;
 
-import com.qmth.themis.backend.config.RocketImConsumer;
 import com.qmth.themis.business.entity.TEExamStudent;
 import com.qmth.themis.business.service.TEExamStudentService;
+import com.qmth.themis.mq.listener.RocketMessageConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
@@ -11,6 +11,7 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
 import java.util.List;
+import java.util.StringJoiner;
 
 /**
  * @Description: 服务启动时初始化运行,哪个微服务模块需要则拿此模版去用
@@ -27,7 +28,7 @@ public class StartRunning implements CommandLineRunner {
     TEExamStudentService teExamStudentService;
 
     @Resource
-    RocketImConsumer rocketImConsumer;
+    RocketMessageConsumer rocketMessageConsumer;
 
     @Value("${rocketmq.name-server}")
     String nameSrvAddr;
@@ -36,10 +37,12 @@ public class StartRunning implements CommandLineRunner {
     public void run(String... args) throws Exception {
         log.info("服务器启动时执行 start");
         List<TEExamStudent> teExamStudentList = teExamStudentService.list();
+        StringJoiner stringJoiner = new StringJoiner("");
         teExamStudentList.forEach(s -> {
-            rocketImConsumer.setRocketMQConsumer(nameSrvAddr, String.valueOf(s.getId()) + "group", "imTopic", "teacher || user");
+            rocketMessageConsumer.setRocketMQConsumer(nameSrvAddr, String.valueOf(s.getId()) + "group", "imTopic", "teacher");
 //            rocketImConsumer.setRocketMQConsumer(nameSrvAddr, "imGroup", "imTopic", "teacher || user");
         });
+//        rocketImConsumer.setRocketMQConsumer(nameSrvAddr, "imGroup", "imTopic", "teacher");
         log.info("服务器启动时执行 end");
     }
 }

+ 5 - 5
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketImConsumer.java → themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketMessageConsumer.java

@@ -1,4 +1,4 @@
-package com.qmth.themis.backend.config;
+package com.qmth.themis.mq.listener;
 
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.util.JacksonUtil;
@@ -26,8 +26,8 @@ import java.util.List;
  * @Date: 2020/7/8
  */
 @Component
-public class RocketImConsumer {
-    private final static Logger log = LoggerFactory.getLogger(RocketImConsumer.class);
+public class RocketMessageConsumer {
+    private final static Logger log = LoggerFactory.getLogger(RocketMessageConsumer.class);
 
     /**
      * 注册rocketmq 消费者
@@ -52,9 +52,9 @@ public class RocketImConsumer {
                     long threadId = Thread.currentThread().getId();
                     String threadName = Thread.currentThread().getName();
                     for (MessageExt messageExt : msgs) {
-                        log.info(":{}-:{} sessionConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
+                        log.info(":{}-:{} messageConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
                         MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
-                        log.info(":{}-:{} sessionConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
+                        log.info(":{}-:{} messageConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
                     }
                 } catch (Exception e) {
                     e.printStackTrace();