|
@@ -41,16 +41,21 @@ public class RedisMessageListener implements MessageListener {
|
|
for (; integer.get() < mqDto.getReconsume(); integer.incrementAndGet()) {
|
|
for (; integer.get() < mqDto.getReconsume(); integer.incrementAndGet()) {
|
|
log.info("reconsume:{}", integer.get());
|
|
log.info("reconsume:{}", integer.get());
|
|
if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
|
|
if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
|
|
|
|
+ && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId()))
|
|
&& redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(),
|
|
&& redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(),
|
|
SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
//通道
|
|
//通道
|
|
String topic = new String(message.getChannel(), SystemConstant.CHARSET_NAME);
|
|
String topic = new String(message.getChannel(), SystemConstant.CHARSET_NAME);
|
|
mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
log.info("mqDto:{},topic:{}", JacksonUtil.parseJson(mqDto), JacksonUtil.parseJson(topic));
|
|
log.info("mqDto:{},topic:{}", JacksonUtil.parseJson(mqDto), JacksonUtil.parseJson(topic));
|
|
|
|
+ redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
break;
|
|
break;
|
|
} else {
|
|
} else {
|
|
|
|
+ if (Objects.isNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId()))) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
mqDto.setAck(SystemConstant.REDELIVERED_ACK_TYPE);
|
|
mqDto.setAck(SystemConstant.REDELIVERED_ACK_TYPE);
|
|
- Thread.sleep(Duration.ofSeconds(1L).toMillis() * (integer.get() == 0 ? 1 : integer.get()));
|
|
|
|
|
|
+ Thread.sleep(Duration.ofSeconds(SystemConstant.REDIS_MQ_MAX_RECONSUME).toMillis() * (integer.get() == 0 ? 1 : integer.get()));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|