|
@@ -75,7 +75,7 @@ public class RocketTaskConsumer {
|
|
mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
log.info(":{}-:{} task import Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
log.info(":{}-:{} task import Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
// log.info(":{}-:{} task import Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
// log.info(":{}-:{} task import Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
- if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.getHashList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
|
|
|
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
|
|
Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
|
|
Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
|
|
String tag = mqDto.getTag();
|
|
String tag = mqDto.getTag();
|
|
myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
|
|
myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
|
|
@@ -97,7 +97,7 @@ public class RocketTaskConsumer {
|
|
mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
|
|
mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
|
|
TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
- redisUtil.deleteHashList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
|
|
|
+ redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST+ mqDto.getId());
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
} else {
|
|
} else {
|
|
log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
@@ -109,7 +109,7 @@ public class RocketTaskConsumer {
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
} finally {
|
|
} finally {
|
|
if (Objects.nonNull(mqDto)) {
|
|
if (Objects.nonNull(mqDto)) {
|
|
- redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
|
|
|
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX+mqDto.getId());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
@@ -175,7 +175,7 @@ public class RocketTaskConsumer {
|
|
mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
log.info(":{}-:{} task export Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
log.info(":{}-:{} task export Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
// log.info(":{}-:{} task ExamStudentImport Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
// log.info(":{}-:{} task ExamStudentImport Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
- if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.getHashList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
|
|
|
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
|
|
Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
|
|
Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
|
|
String tag = mqDto.getTag();
|
|
String tag = mqDto.getTag();
|
|
myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
|
|
myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
|
|
@@ -193,7 +193,7 @@ public class RocketTaskConsumer {
|
|
mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
|
|
mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
|
|
TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
- redisUtil.deleteHashList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
|
|
|
+ redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST+mqDto.getId());
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
} else {
|
|
} else {
|
|
log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
@@ -205,7 +205,7 @@ public class RocketTaskConsumer {
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
} finally {
|
|
} finally {
|
|
if (Objects.nonNull(mqDto)) {
|
|
if (Objects.nonNull(mqDto)) {
|
|
- redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
|
|
|
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX+ mqDto.getId());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|