|
@@ -4,8 +4,10 @@ import com.google.gson.Gson;
|
|
import com.qmth.themis.business.constant.SystemConstant;
|
|
import com.qmth.themis.business.constant.SystemConstant;
|
|
import com.qmth.themis.business.entity.TMRocketMessage;
|
|
import com.qmth.themis.business.entity.TMRocketMessage;
|
|
import com.qmth.themis.business.service.TMRocketMessageService;
|
|
import com.qmth.themis.business.service.TMRocketMessageService;
|
|
|
|
+import com.qmth.themis.business.templete.TaskExportTemplete;
|
|
import com.qmth.themis.business.templete.TaskImportTemplete;
|
|
import com.qmth.themis.business.templete.TaskImportTemplete;
|
|
import com.qmth.themis.business.templete.impl.TaskExamStudentImportTemplete;
|
|
import com.qmth.themis.business.templete.impl.TaskExamStudentImportTemplete;
|
|
|
|
+import com.qmth.themis.business.templete.impl.TaskRoomCodeExportTemplete;
|
|
import com.qmth.themis.business.templete.impl.TaskRoomCodeImportTemplete;
|
|
import com.qmth.themis.business.templete.impl.TaskRoomCodeImportTemplete;
|
|
import com.qmth.themis.business.threadPool.MyThreadPool;
|
|
import com.qmth.themis.business.threadPool.MyThreadPool;
|
|
import com.qmth.themis.business.util.JacksonUtil;
|
|
import com.qmth.themis.business.util.JacksonUtil;
|
|
@@ -42,7 +44,7 @@ import java.util.concurrent.TimeUnit;
|
|
* @Date: 2020/7/2
|
|
* @Date: 2020/7/2
|
|
*/
|
|
*/
|
|
@Service
|
|
@Service
|
|
-public class RocketTaskConsumer implements MessageListenerConcurrently {
|
|
|
|
|
|
+public class RocketTaskConsumer {
|
|
private final static Logger log = LoggerFactory.getLogger(RocketTaskConsumer.class);
|
|
private final static Logger log = LoggerFactory.getLogger(RocketTaskConsumer.class);
|
|
|
|
|
|
@Resource
|
|
@Resource
|
|
@@ -54,54 +56,61 @@ public class RocketTaskConsumer implements MessageListenerConcurrently {
|
|
@Resource
|
|
@Resource
|
|
MyThreadPool myThreadPool;
|
|
MyThreadPool myThreadPool;
|
|
|
|
|
|
- @Override
|
|
|
|
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
|
- MqDto mqDto = null;
|
|
|
|
- try {
|
|
|
|
- long threadId = Thread.currentThread().getId();
|
|
|
|
- String threadName = Thread.currentThread().getName();
|
|
|
|
- Gson gson = new Gson();
|
|
|
|
- for (MessageExt messageExt : msgs) {
|
|
|
|
- log.info(":{}-:{} task ExamStudentImport Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
|
- mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
|
- log.info(":{}-:{} task ExamStudentImport Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
|
- log.info(":{}-:{} task ExamStudentImport Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
|
|
- if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.getMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
|
|
|
|
- Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
|
|
|
|
- String tag = mqDto.getTag();
|
|
|
|
- myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
|
|
|
|
- TaskImportTemplete taskImportTemplete = null;
|
|
|
|
- if (tag.contains("examStudentImport")) {
|
|
|
|
- taskImportTemplete = new TaskExamStudentImportTemplete();
|
|
|
|
- } else if (tag.contains("roomCodeImport")) {
|
|
|
|
- taskImportTemplete = new TaskRoomCodeImportTemplete();
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- taskImportTemplete.importTask(map);
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- e.printStackTrace();
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
|
- mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
|
|
|
|
- TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
|
- tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
|
- redisUtil.deleteMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
|
- } else {
|
|
|
|
- log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
|
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 导入任务监听
|
|
|
|
+ */
|
|
|
|
+ @Service
|
|
|
|
+ public class ImportTask implements MessageListenerConcurrently {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
|
+ MqDto mqDto = null;
|
|
|
|
+ try {
|
|
|
|
+ long threadId = Thread.currentThread().getId();
|
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
|
+ Gson gson = new Gson();
|
|
|
|
+ for (MessageExt messageExt : msgs) {
|
|
|
|
+ log.info(":{}-:{} task import Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
|
+ mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
|
+ log.info(":{}-:{} task import Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
|
+// log.info(":{}-:{} task import Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.getMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
|
|
|
|
+ Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
|
|
|
|
+ String tag = mqDto.getTag();
|
|
|
|
+ myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
|
|
|
|
+ TaskImportTemplete taskImportTemplete = null;
|
|
|
|
+ if (tag.contains("examStudentImport")) {
|
|
|
|
+ taskImportTemplete = new TaskExamStudentImportTemplete();
|
|
|
|
+ } else if (tag.contains("roomCodeImport")) {
|
|
|
|
+ taskImportTemplete = new TaskRoomCodeImportTemplete();
|
|
|
|
+ }
|
|
|
|
+ try {
|
|
|
|
+ taskImportTemplete.importTask(map);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
|
+ mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
|
|
|
|
+ TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
|
+ tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
|
+ redisUtil.deleteMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
|
+ } else {
|
|
|
|
+ log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
|
+ } finally {
|
|
|
|
+ if (Objects.nonNull(mqDto)) {
|
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- } catch (Exception e) {
|
|
|
|
- e.printStackTrace();
|
|
|
|
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
|
- } finally {
|
|
|
|
- if (Objects.nonNull(mqDto)) {
|
|
|
|
- redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
|
|
|
|
- }
|
|
|
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
}
|
|
}
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -121,7 +130,7 @@ public class RocketTaskConsumer implements MessageListenerConcurrently {
|
|
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
- defaultMQPushConsumer.registerMessageListener(RocketTaskConsumer.this::consumeMessage);
|
|
|
|
|
|
+ defaultMQPushConsumer.registerMessageListener(new ImportTask()::consumeMessage);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -142,7 +151,82 @@ public class RocketTaskConsumer implements MessageListenerConcurrently {
|
|
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
- defaultMQPushConsumer.registerMessageListener(RocketTaskConsumer.this::consumeMessage);
|
|
|
|
|
|
+ defaultMQPushConsumer.registerMessageListener(new ImportTask()::consumeMessage);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 导出任务
|
|
|
|
+ */
|
|
|
|
+ public class exportTask implements MessageListenerConcurrently {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
|
+ MqDto mqDto = null;
|
|
|
|
+ try {
|
|
|
|
+ long threadId = Thread.currentThread().getId();
|
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
|
+ Gson gson = new Gson();
|
|
|
|
+ for (MessageExt messageExt : msgs) {
|
|
|
|
+ log.info(":{}-:{} task export Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
|
+ mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
|
+ log.info(":{}-:{} task export Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
|
+// log.info(":{}-:{} task ExamStudentImport Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.getMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
|
|
|
|
+ Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
|
|
|
|
+ String tag = mqDto.getTag();
|
|
|
|
+ myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
|
|
|
|
+ TaskExportTemplete taskExportTemplete = null;
|
|
|
|
+ if (tag.contains("roomCodeExport")) {
|
|
|
|
+ taskExportTemplete = new TaskRoomCodeExportTemplete();
|
|
|
|
+ }
|
|
|
|
+ try {
|
|
|
|
+ taskExportTemplete.exportTask(map);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
|
+ mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
|
|
|
|
+ TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
|
+ tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
|
+ redisUtil.deleteMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
|
+ } else {
|
|
|
|
+ log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
|
+ } finally {
|
|
|
|
+ if (Objects.nonNull(mqDto)) {
|
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 考场导出
|
|
|
|
+ */
|
|
|
|
+ @Service
|
|
|
|
+ @RocketMQMessageListener(consumerGroup = "${mq.config.taskConsumerRoomCodeExportGroup}", topic = "${mq.config.taskTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.taskTopicRoomCodeExportTag}")
|
|
|
|
+ public class taskConsumerRoomCodeExport implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void onMessage(Message message) {
|
|
|
|
+ //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
|
+ defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
|
+ defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
|
+ defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
|
+ defaultMQPushConsumer.registerMessageListener(new exportTask()::consumeMessage);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|