|
@@ -61,19 +61,20 @@ public class RocketSessionConsumer implements
|
|
|
@Override
|
|
|
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
try {
|
|
|
+ long threadId = Thread.currentThread().getId();
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
for (MessageExt messageExt : msgs) {
|
|
|
- long threadId = Thread.currentThread().getId();
|
|
|
- log.info(":{} sessionConsumer 重试次数:{}", threadId, messageExt.getReconsumeTimes());
|
|
|
+ log.info(":{}-:{} sessionConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
|
|
|
- log.info(":{} sessionConsumer 接受到的消息:{}", threadId, JSONObject.toJSONString(mqDto));
|
|
|
- log.info(":{} sessionConsumer mqDto sequence:{},tag:{}", threadId, mqDto.getSequence(), mqDto.getTag());
|
|
|
+ log.info(":{}-:{} sessionConsumer 接受到的消息:{}", threadId, threadName, JSONObject.toJSONString(mqDto));
|
|
|
+ log.info(":{}-:{} sessionConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
|
if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
|
|
|
- log.info(":{} 更新db", threadId);
|
|
|
+ log.info(":{}-:{} 更新db", threadId, threadName);
|
|
|
tbSessionService.saveSessionInfo(toJavaObject((JSON) mqDto.getBody(), TBSession.class), mqDto.getTimestamp());
|
|
|
redisUtil.deleteSessionTopicList(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
} else {
|
|
|
- log.info(":{} 消息ack未确认,重发", threadId);
|
|
|
+ log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
}
|
|
|
}
|