|
@@ -1,27 +1,5 @@
|
|
|
package com.qmth.themis.mq.listener;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Objects;
|
|
|
-
|
|
|
-import javax.annotation.Resource;
|
|
|
-
|
|
|
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
|
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
|
|
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
|
|
-import org.apache.rocketmq.common.message.Message;
|
|
|
-import org.apache.rocketmq.common.message.MessageExt;
|
|
|
-import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
-import org.apache.rocketmq.spring.annotation.SelectorType;
|
|
|
-import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
|
-import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
-import org.springframework.stereotype.Service;
|
|
|
-
|
|
|
import com.google.gson.Gson;
|
|
|
import com.qmth.themis.business.constant.SpringContextHolder;
|
|
|
import com.qmth.themis.business.constant.SystemConstant;
|
|
@@ -37,6 +15,26 @@ import com.qmth.themis.business.util.JacksonUtil;
|
|
|
import com.qmth.themis.business.util.RedisUtil;
|
|
|
import com.qmth.themis.common.contanst.Constants;
|
|
|
import com.qmth.themis.mq.dto.MqDto;
|
|
|
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
|
|
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
|
|
+import org.apache.rocketmq.common.message.Message;
|
|
|
+import org.apache.rocketmq.common.message.MessageExt;
|
|
|
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
+import org.apache.rocketmq.spring.annotation.SelectorType;
|
|
|
+import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
|
+import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
|
|
|
/**
|
|
|
* @Description: 普通消息监听 task
|
|
@@ -76,7 +74,7 @@ public class RocketTaskConsumer {
|
|
|
mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
log.info(":{}-:{} task import Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
// log.info(":{}-:{} task import Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
|
- if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX+mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
|
|
|
String tag = mqDto.getTag();
|
|
|
myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
|
|
@@ -85,8 +83,8 @@ public class RocketTaskConsumer {
|
|
|
taskImportTemplete = new TaskExamStudentImportTemplete();
|
|
|
} else if (tag.contains("roomCodeImport")) {
|
|
|
taskImportTemplete = new TaskRoomCodeImportTemplete();
|
|
|
- } else if(tag.contains("examPaperImport")) {
|
|
|
- taskImportTemplete = SpringContextHolder.getBean("taskExamPaperImportTemplete");
|
|
|
+ } else if (tag.contains("examPaperImport")) {
|
|
|
+ taskImportTemplete = SpringContextHolder.getBean("taskExamPaperImportTemplete");
|
|
|
}
|
|
|
try {
|
|
|
taskImportTemplete.importTask(map);
|
|
@@ -98,7 +96,7 @@ public class RocketTaskConsumer {
|
|
|
mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
|
|
|
TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
- redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST+ mqDto.getId());
|
|
|
+ redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST + mqDto.getId());
|
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
} else {
|
|
|
log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
@@ -110,7 +108,7 @@ public class RocketTaskConsumer {
|
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
} finally {
|
|
|
if (Objects.nonNull(mqDto)) {
|
|
|
- redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX+mqDto.getId());
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
|
}
|
|
|
}
|
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
@@ -176,7 +174,7 @@ public class RocketTaskConsumer {
|
|
|
mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
log.info(":{}-:{} task export Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
// log.info(":{}-:{} task ExamStudentImport Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
|
- if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX+ mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
|
|
|
String tag = mqDto.getTag();
|
|
|
myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
|
|
@@ -194,7 +192,7 @@ public class RocketTaskConsumer {
|
|
|
mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
|
|
|
TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
- redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST+mqDto.getId());
|
|
|
+ redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST + mqDto.getId());
|
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
} else {
|
|
|
log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
@@ -206,7 +204,7 @@ public class RocketTaskConsumer {
|
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
} finally {
|
|
|
if (Objects.nonNull(mqDto)) {
|
|
|
- redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX+ mqDto.getId());
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
|
}
|
|
|
}
|
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|