|
@@ -5,7 +5,8 @@ import com.qmth.themis.business.constant.SystemConstant;
|
|
|
import com.qmth.themis.business.entity.TMRocketMessage;
|
|
|
import com.qmth.themis.business.service.TMRocketMessageService;
|
|
|
import com.qmth.themis.business.templete.TaskImportTemplete;
|
|
|
-import com.qmth.themis.business.templete.impl.TaskExamStudentTemplete;
|
|
|
+import com.qmth.themis.business.templete.impl.TaskExamStudentImportTemplete;
|
|
|
+import com.qmth.themis.business.templete.impl.TaskRoomCodeImportTemplete;
|
|
|
import com.qmth.themis.business.threadPool.MyThreadPool;
|
|
|
import com.qmth.themis.business.util.JacksonUtil;
|
|
|
import com.qmth.themis.business.util.RedisUtil;
|
|
@@ -41,7 +42,7 @@ import java.util.concurrent.TimeUnit;
|
|
|
* @Date: 2020/7/2
|
|
|
*/
|
|
|
@Service
|
|
|
-public class RocketTaskConsumer implements MessageListenerConcurrently {
|
|
|
+public class RocketTaskConsumer {
|
|
|
private final static Logger log = LoggerFactory.getLogger(RocketTaskConsumer.class);
|
|
|
|
|
|
@Resource
|
|
@@ -53,54 +54,78 @@ public class RocketTaskConsumer implements MessageListenerConcurrently {
|
|
|
@Resource
|
|
|
MyThreadPool myThreadPool;
|
|
|
|
|
|
- @Override
|
|
|
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
- MqDto mqDto = null;
|
|
|
- try {
|
|
|
- long threadId = Thread.currentThread().getId();
|
|
|
- String threadName = Thread.currentThread().getName();
|
|
|
- Gson gson = new Gson();
|
|
|
- for (MessageExt messageExt : msgs) {
|
|
|
- log.info(":{}-:{} taskConsumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
-// MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
|
|
|
- mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
- log.info(":{}-:{} taskConsumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
- log.info(":{}-:{} taskConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
|
- if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.getMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
|
|
|
- Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
|
|
|
- myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
|
|
|
- TaskImportTemplete examStudentTemplete = new TaskExamStudentTemplete();
|
|
|
- try {
|
|
|
- examStudentTemplete.importTask(map);
|
|
|
- } catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
+ /**
|
|
|
+ * 考生导入
|
|
|
+ */
|
|
|
+ @Service
|
|
|
+ @RocketMQMessageListener(consumerGroup = "${mq.config.taskConsumerExamStudentImportGroup}", topic = "${mq.config.taskTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.taskTopicExamStudentImportTag}")
|
|
|
+ public class taskConsumerExamStudent implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onMessage(Message message) {
|
|
|
+ //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
+ defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
+ defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
+ defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
+ defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
+ MqDto mqDto = null;
|
|
|
+ try {
|
|
|
+ long threadId = Thread.currentThread().getId();
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
+ Gson gson = new Gson();
|
|
|
+ for (MessageExt messageExt : msgs) {
|
|
|
+ log.info(":{}-:{} task ExamStudentImport Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
+ mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
+ log.info(":{}-:{} task ExamStudentImport Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
+ log.info(":{}-:{} task ExamStudentImport Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.getMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
|
|
|
+ Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
|
|
|
+ myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
|
|
|
+ TaskImportTemplete examStudentImportTemplete = new TaskExamStudentImportTemplete();
|
|
|
+ try {
|
|
|
+ examStudentImportTemplete.importTask(map);
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
+ mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
|
|
|
+ TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
+ tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
+ redisUtil.deleteMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
+ } else {
|
|
|
+ log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+ }
|
|
|
}
|
|
|
- });
|
|
|
- mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
- mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
|
|
|
- TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
- tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
- redisUtil.deleteMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
- } else {
|
|
|
- log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+ } finally {
|
|
|
+ if (Objects.nonNull(mqDto)) {
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
}
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
- } finally {
|
|
|
- if (Objects.nonNull(mqDto)) {
|
|
|
- redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
|
|
|
- }
|
|
|
+ });
|
|
|
}
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 考场导入
|
|
|
+ */
|
|
|
@Service
|
|
|
- @RocketMQMessageListener(consumerGroup = "${mq.config.taskConsumerExamStudentGroup}", topic = "${mq.config.taskTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.taskTopicExamStudentTag}")
|
|
|
- public class taskConsumerExamStudent implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
+ @RocketMQMessageListener(consumerGroup = "${mq.config.taskConsumerRoomCodeImportGroup}", topic = "${mq.config.taskTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.taskTopicRoomCodeImportTag}")
|
|
|
+ public class taskConsumerRoomCodeImport implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
|
|
|
@Override
|
|
|
public void onMessage(Message message) {
|
|
@@ -112,25 +137,52 @@ public class RocketTaskConsumer implements MessageListenerConcurrently {
|
|
|
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
- defaultMQPushConsumer.registerMessageListener(RocketTaskConsumer.this::consumeMessage);
|
|
|
+ defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
+ MqDto mqDto = null;
|
|
|
+ try {
|
|
|
+ long threadId = Thread.currentThread().getId();
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
+ Gson gson = new Gson();
|
|
|
+ for (MessageExt messageExt : msgs) {
|
|
|
+ log.info(":{}-:{} task roomCodeImport Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
+ mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
+ log.info(":{}-:{} task roomCodeImport Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
+ log.info(":{}-:{} task roomCodeImport Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.getMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
|
|
|
+ Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
|
|
|
+ myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
|
|
|
+ TaskImportTemplete roomCodeImportTemplete = new TaskRoomCodeImportTemplete();
|
|
|
+ try {
|
|
|
+ roomCodeImportTemplete.importTask(map);
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
+ mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
|
|
|
+ TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
+ tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
+ redisUtil.deleteMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
+ } else {
|
|
|
+ log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+ } finally {
|
|
|
+ if (Objects.nonNull(mqDto)) {
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-// @Service
|
|
|
-// @RocketMQMessageListener(consumerGroup = "${mq.config.userLogConsumerStudentGroup}", topic = "${mq.config.userLogTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.userLogTopicStudentTag}")
|
|
|
-// public class sessionConsumerStudentLog implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
-//
|
|
|
-// @Override
|
|
|
-// public void onMessage(Message message) {
|
|
|
-// //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
-// }
|
|
|
-//
|
|
|
-// @Override
|
|
|
-// public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
-// defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
-// defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
-// defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
-// defaultMQPushConsumer.registerMessageListener(RocketTaskConsumer.this::consumeMessage);
|
|
|
-// }
|
|
|
-// }
|
|
|
-}
|
|
|
+}
|