wangliang 5 anos atrás
pai
commit
5a2d203f6a

+ 7 - 5
themis-business/src/main/java/com/qmth/themis/business/listener/RocketSessionConsumer.java

@@ -62,17 +62,18 @@ public class RocketSessionConsumer implements
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         try {
             for (MessageExt messageExt : msgs) {
-                log.info("sessionConsumer重试次数:{}", messageExt.getReconsumeTimes());
+                long threadId = Thread.currentThread().getId();
+                log.info(":{} sessionConsumer 重试次数:{}", threadId, messageExt.getReconsumeTimes());
                 MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
-                log.info("sessionConsumer接受到的消息:{}", JSONObject.toJSONString(mqDto));
-                log.info("sessionConsumer mqDto sequence:{},tag:{}", mqDto.getSequence(), mqDto.getTag());
+                log.info(":{} sessionConsumer 接受到的消息:{}", threadId, JSONObject.toJSONString(mqDto));
+                log.info(":{} sessionConsumer mqDto sequence:{},tag:{}", threadId, mqDto.getSequence(), mqDto.getTag());
                 if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
-                    log.info("更新db");
+                    log.info(":{} 更新db", threadId);
                     tbSessionService.saveSessionInfo(toJavaObject((JSON) mqDto.getBody(), TBSession.class), mqDto.getTimestamp());
                     redisUtil.deleteSessionTopicList(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId());
                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 } else {
-                    log.info("消息ack未确认,重发");
+                    log.info(":{} 消息ack未确认,重发", threadId);
                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
                 }
             }
@@ -136,6 +137,7 @@ public class RocketSessionConsumer implements
             defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
             defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
             defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
+//            defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
             defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
         }
     }