|
@@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit;
|
|
|
* @Date: 2020/7/2
|
|
|
*/
|
|
|
@Service
|
|
|
-public class RocketTaskConsumer {
|
|
|
+public class RocketTaskConsumer implements MessageListenerConcurrently {
|
|
|
private final static Logger log = LoggerFactory.getLogger(RocketTaskConsumer.class);
|
|
|
|
|
|
@Resource
|
|
@@ -54,6 +54,56 @@ public class RocketTaskConsumer {
|
|
|
@Resource
|
|
|
MyThreadPool myThreadPool;
|
|
|
|
|
|
+ @Override
|
|
|
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
+ MqDto mqDto = null;
|
|
|
+ try {
|
|
|
+ long threadId = Thread.currentThread().getId();
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
+ Gson gson = new Gson();
|
|
|
+ for (MessageExt messageExt : msgs) {
|
|
|
+ log.info(":{}-:{} task ExamStudentImport Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
+ mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
+ log.info(":{}-:{} task ExamStudentImport Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
+ log.info(":{}-:{} task ExamStudentImport Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.getMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
|
|
|
+ Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
|
|
|
+ String tag = mqDto.getTag();
|
|
|
+ myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
|
|
|
+ TaskImportTemplete taskImportTemplete = null;
|
|
|
+ if (tag.contains("examStudentImport")) {
|
|
|
+ taskImportTemplete = new TaskExamStudentImportTemplete();
|
|
|
+ } else if (tag.contains("roomCodeImport")) {
|
|
|
+ taskImportTemplete = new TaskRoomCodeImportTemplete();
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ taskImportTemplete.importTask(map);
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
+ mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
|
|
|
+ TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
+ tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
+ redisUtil.deleteMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
+ } else {
|
|
|
+ log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+ } finally {
|
|
|
+ if (Objects.nonNull(mqDto)) {
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 考生导入
|
|
|
*/
|
|
@@ -71,52 +121,7 @@ public class RocketTaskConsumer {
|
|
|
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
- defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
- MqDto mqDto = null;
|
|
|
- try {
|
|
|
- long threadId = Thread.currentThread().getId();
|
|
|
- String threadName = Thread.currentThread().getName();
|
|
|
- Gson gson = new Gson();
|
|
|
- for (MessageExt messageExt : msgs) {
|
|
|
- log.info(":{}-:{} task ExamStudentImport Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
- mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
- log.info(":{}-:{} task ExamStudentImport Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
- log.info(":{}-:{} task ExamStudentImport Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
|
- if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.getMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
|
|
|
- Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
|
|
|
- myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
|
|
|
- TaskImportTemplete examStudentImportTemplete = new TaskExamStudentImportTemplete();
|
|
|
- try {
|
|
|
- examStudentImportTemplete.importTask(map);
|
|
|
- } catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- });
|
|
|
- mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
- mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
|
|
|
- TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
- tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
- redisUtil.deleteMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
- } else {
|
|
|
- log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
- } finally {
|
|
|
- if (Objects.nonNull(mqDto)) {
|
|
|
- redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
|
|
|
- }
|
|
|
- }
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
- }
|
|
|
- });
|
|
|
+ defaultMQPushConsumer.registerMessageListener(RocketTaskConsumer.this::consumeMessage);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -137,52 +142,7 @@ public class RocketTaskConsumer {
|
|
|
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
- defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
- MqDto mqDto = null;
|
|
|
- try {
|
|
|
- long threadId = Thread.currentThread().getId();
|
|
|
- String threadName = Thread.currentThread().getName();
|
|
|
- Gson gson = new Gson();
|
|
|
- for (MessageExt messageExt : msgs) {
|
|
|
- log.info(":{}-:{} task roomCodeImport Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
- mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
- log.info(":{}-:{} task roomCodeImport Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
- log.info(":{}-:{} task roomCodeImport Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
|
- if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.getMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
|
|
|
- Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
|
|
|
- myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
|
|
|
- TaskImportTemplete roomCodeImportTemplete = new TaskRoomCodeImportTemplete();
|
|
|
- try {
|
|
|
- roomCodeImportTemplete.importTask(map);
|
|
|
- } catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- });
|
|
|
- mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
- mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
|
|
|
- TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
- tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
- redisUtil.deleteMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
- } else {
|
|
|
- log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
- } finally {
|
|
|
- if (Objects.nonNull(mqDto)) {
|
|
|
- redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
|
|
|
- }
|
|
|
- }
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
- }
|
|
|
- });
|
|
|
+ defaultMQPushConsumer.registerMessageListener(RocketTaskConsumer.this::consumeMessage);
|
|
|
}
|
|
|
}
|
|
|
}
|