|
@@ -81,11 +81,11 @@ public class MqJob extends QuartzJobBean {
|
|
if (mqDto.getReconsume() >= SystemConstant.MAXRECONSUMETIMES) {
|
|
if (mqDto.getReconsume() >= SystemConstant.MAXRECONSUMETIMES) {
|
|
tmRocketMessageService.saveMqMessageError(mqDto, redisKey);
|
|
tmRocketMessageService.saveMqMessageError(mqDto, redisKey);
|
|
} else {
|
|
} else {
|
|
- mqDto.setReconsume(mqDto.getReconsume() + 1);
|
|
|
|
Map<String, Object> prop = mqDto.getProperties();
|
|
Map<String, Object> prop = mqDto.getProperties();
|
|
Long mqExecTime = Long.parseLong(String.valueOf(prop.get("mqExecTime")));
|
|
Long mqExecTime = Long.parseLong(String.valueOf(prop.get("mqExecTime")));
|
|
//mq为延时消息,所以每分钟扫描时有可能mq消息还未执行,所以加上mq消息等级和当前时间对比,如超过mq的延时时间一分钟则执行
|
|
//mq为延时消息,所以每分钟扫描时有可能mq消息还未执行,所以加上mq消息等级和当前时间对比,如超过mq的延时时间一分钟则执行
|
|
if (Objects.nonNull(mqExecTime) && (mqExecTime - System.currentTimeMillis()) / 1000 / 60 <= -1) {
|
|
if (Objects.nonNull(mqExecTime) && (mqExecTime - System.currentTimeMillis()) / 1000 / 60 <= -1) {
|
|
|
|
+ mqDto.setReconsume(mqDto.getReconsume() + 1);
|
|
producerServer.asyncDelayMsg(mqDto);
|
|
producerServer.asyncDelayMsg(mqDto);
|
|
}
|
|
}
|
|
}
|
|
}
|