Bläddra i källkod

动态注册mq消费者

wangliang 5 år sedan
förälder
incheckning
c13e0344f3

+ 34 - 28
themis-backend/src/main/java/com/qmth/themis/backend/api/TBUserController.java

@@ -5,7 +5,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.google.gson.Gson;
 import com.qmth.themis.backend.config.DictionaryConfig;
-import com.qmth.themis.backend.elasticsearch.entity.ETEStudentEntity;
 import com.qmth.themis.backend.elasticsearch.repository.ETEStudentRepo;
 import com.qmth.themis.backend.elasticsearch.service.ETEStudentService;
 import com.qmth.themis.backend.mongodb.entity.*;
@@ -173,33 +172,40 @@ public class TBUserController {
     @Transactional
     public Result esList() {
 //        eteStudentService.createIndex();
-        List<ETEStudentEntity> list = new ArrayList<>();
-        for (int i = 0; i < 10; i++) {
-            //学生档案
-            ETEStudentEntity eteStudentEntity = new ETEStudentEntity();
-            eteStudentEntity.setId(Constants.idGen.next());
-            eteStudentEntity.setOrgId(1L);
-            eteStudentEntity.setIdentity("test" + i);
-            eteStudentEntity.setPassword("123456");
-            eteStudentEntity.setIdcardNumber(RandomStringUtils.randomAlphanumeric(18));
-            eteStudentEntity.setMobileNumber(RandomStringUtils.randomNumeric(11));
-            eteStudentEntity.setName("java" + i + RandomStringUtils.randomAlphanumeric(30));
-            eteStudentEntity.setGender(1);
-            eteStudentEntity.setBasePhotoPath("http://11111");
-            eteStudentEntity.setCreateTime(new Date());
-            list.add(eteStudentEntity);
-        }
-        eteStudentService.saveAll(list);
-//        Iterator<ETEStudentEntity> iterator = eteStudentService.findAll();
-        org.springframework.data.domain.Page<ETEStudentEntity> eteStudentEntityPage1 = (org.springframework.data.domain.Page<ETEStudentEntity>) eteStudentService.queryName("2");
-//        List<ETEStudentEntity> eteStudentEntityList = eteStudentService.findByNameLike("java");
-        org.springframework.data.domain.Page<ETEStudentEntity> eteStudentEntityPage2 = (org.springframework.data.domain.Page<ETEStudentEntity>) eteStudentService.queryMobileNumber("2");
-//        Optional<ETEStudentEntity> eteStudentEntity = eteStudentRepo.findById(eteStudentEntityPage.getContent().get(0).getId());
-        Map map = new HashMap();
-        map.put(SystemConstant.RECORDS, eteStudentEntityPage1);
-        map.put(SystemConstant.RECORDS + 2, eteStudentEntityPage2);
-//        map.put("bean", eteStudentEntity.get());
-        return ResultUtil.ok(map);
+//        List<ETEStudentEntity> list = new ArrayList<>();
+//        for (int i = 0; i < 10; i++) {
+//            //学生档案
+//            ETEStudentEntity eteStudentEntity = new ETEStudentEntity();
+//            eteStudentEntity.setId(Constants.idGen.next());
+//            eteStudentEntity.setOrgId(1L);
+//            eteStudentEntity.setIdentity("test" + i);
+//            eteStudentEntity.setPassword("123456");
+//            eteStudentEntity.setIdcardNumber(RandomStringUtils.randomAlphanumeric(18));
+//            eteStudentEntity.setMobileNumber(RandomStringUtils.randomNumeric(11));
+//            eteStudentEntity.setName("java" + i + RandomStringUtils.randomAlphanumeric(30));
+//            eteStudentEntity.setGender(1);
+//            eteStudentEntity.setBasePhotoPath("http://11111");
+//            eteStudentEntity.setCreateTime(new Date());
+//            list.add(eteStudentEntity);
+//        }
+//        eteStudentService.saveAll(list);
+////        Iterator<ETEStudentEntity> iterator = eteStudentService.findAll();
+//        org.springframework.data.domain.Page<ETEStudentEntity> eteStudentEntityPage1 = (org.springframework.data.domain.Page<ETEStudentEntity>) eteStudentService.queryName("2");
+////        List<ETEStudentEntity> eteStudentEntityList = eteStudentService.findByNameLike("java");
+//        org.springframework.data.domain.Page<ETEStudentEntity> eteStudentEntityPage2 = (org.springframework.data.domain.Page<ETEStudentEntity>) eteStudentService.queryMobileNumber("2");
+////        Optional<ETEStudentEntity> eteStudentEntity = eteStudentRepo.findById(eteStudentEntityPage.getContent().get(0).getId());
+//        Map map = new HashMap();
+//        map.put(SystemConstant.RECORDS, eteStudentEntityPage1);
+//        map.put(SystemConstant.RECORDS + 2, eteStudentEntityPage2);
+////        map.put("bean", eteStudentEntity.get());
+//        return ResultUtil.ok(map);
+
+//        for (int i = 0; i < 5; i++) {
+        mqDtoService.assembleSendOneWayMsg("imTopic", "teacher", "老师发送的一条消息1", MqEnum.MESSAGE_LOG.name(), "1", "2");
+        mqDtoService.assembleSendOneWayMsg("imTopic", "student", "用户发送的一条消息2", MqEnum.MESSAGE_LOG.name(), "1", "2");
+        mqDtoService.assembleSendOneWayMsg("imTopic", "student", "学生发送的一条消息3", MqEnum.MESSAGE_LOG.name(), "1", "2");
+        //        }
+        return ResultUtil.ok(SystemConstant.SUCCESS);
     }
 
     @ApiOperation(value = "用户查询接口")

+ 45 - 0
themis-backend/src/main/java/com/qmth/themis/backend/start/StartRunning.java

@@ -0,0 +1,45 @@
+package com.qmth.themis.backend.start;
+
+import com.qmth.themis.backend.config.RocketImConsumer;
+import com.qmth.themis.business.entity.TEExamStudent;
+import com.qmth.themis.business.service.TEExamStudentService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.List;
+
+/**
+ * @Description: 服务启动时初始化运行,哪个微服务模块需要则拿此模版去用
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/3
+ */
+@Component
+public class StartRunning implements CommandLineRunner {
+    private final static Logger log = LoggerFactory.getLogger(StartRunning.class);
+
+    @Resource
+    TEExamStudentService teExamStudentService;
+
+    @Resource
+    RocketImConsumer rocketImConsumer;
+
+    @Value("${rocketmq.name-server}")
+    String nameSrvAddr;
+
+    @Override
+    public void run(String... args) throws Exception {
+        log.info("服务器启动时执行 start");
+        List<TEExamStudent> teExamStudentList = teExamStudentService.list();
+        teExamStudentList.forEach(s -> {
+            rocketImConsumer.setRocketMQConsumer(nameSrvAddr, String.valueOf(s.getId()) + "group", "imTopic", "teacher || user");
+//            rocketImConsumer.setRocketMQConsumer(nameSrvAddr, "imGroup", "imTopic", "teacher || user");
+        });
+        log.info("服务器启动时执行 end");
+    }
+}

+ 2 - 0
themis-backend/src/main/resources/application.properties

@@ -150,7 +150,9 @@ rocketmq.producer.retry-times-when-send-failed=3
 #ACK
 rocketmq.producer.access-key=AK
 rocketmq.producer.secret-key=SK
+#\u542F\u7528\u6D88\u606F\u8F68\u8FF9\uFF0C\u9ED8\u8BA4\u503Ctrue
 rocketmq.producer.enable-msg-trace=true
+#\u81EA\u5B9A\u4E49\u7684\u6D88\u606F\u8F68\u8FF9\u4E3B\u9898
 rocketmq.producer.customized-trace-topic=my-trace-topic
 
 mq.config.server=themis

+ 75 - 0
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketImConsumer.java

@@ -0,0 +1,75 @@
+package com.qmth.themis.backend.config;
+
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.mq.dto.MqDto;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * @Description: rocketmq 动态注册消费者
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/8
+ */
+@Component
+public class RocketImConsumer {
+    private final static Logger log = LoggerFactory.getLogger(RocketImConsumer.class);
+
+    /**
+     * 注册rocketmq 消费者
+     *
+     * @param nameSrvAddr
+     * @param consumerGroupName
+     * @param topic
+     * @param tag
+     * @return
+     */
+    public DefaultMQPushConsumer setRocketMQConsumer(String nameSrvAddr, String consumerGroupName, String topic, String tag) {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroupName);
+        consumer.setNamesrvAddr(nameSrvAddr);
+        consumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        consumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
+        consumer.setMessageModel(MessageModel.BROADCASTING);
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+                try {
+                    long threadId = Thread.currentThread().getId();
+                    String threadName = Thread.currentThread().getName();
+                    for (MessageExt messageExt : msgs) {
+                        log.info(":{}-:{} sessionConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
+                        MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
+                        log.info(":{}-:{} sessionConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                }
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+            }
+        });
+        try {
+            consumer.subscribe(topic, tag);
+            consumer.start();
+            log.info("consumer is start !!! groupName:{},topic:{},tag:{},namesrvAddr:{}", consumerGroupName, topic, tag, nameSrvAddr);
+        } catch (MQClientException e) {
+            e.printStackTrace();
+        }
+        return consumer;
+    }
+}

+ 1 - 1
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketSessionConsumer.java

@@ -62,7 +62,7 @@ public class RocketSessionConsumer implements
             for (MessageExt messageExt : msgs) {
                 log.info(":{}-:{} sessionConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
                 MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
-                log.info(":{}-:{} sessionConsumer 接到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
+                log.info(":{}-:{} sessionConsumer 接到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
                 log.info(":{}-:{} sessionConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
                 if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
                     log.info(":{}-:{} 更新db", threadId, threadName);

+ 1 - 1
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketUserLogConsumer.java

@@ -53,7 +53,7 @@ public class RocketUserLogConsumer implements MessageListenerConcurrently {
                 log.info(":{}-:{} userLogConsumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
 //                MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
                 MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
-                log.info(":{}-:{} userLogConsumer接到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
+                log.info(":{}-:{} userLogConsumer接到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
                 log.info(":{}-:{} userLogConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
                 if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
                     log.info(":{}-:{} 插入用户轨迹日志", threadId, threadName);