|
@@ -0,0 +1,168 @@
|
|
|
+package com.qmth.themis.task.listener;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.google.gson.Gson;
|
|
|
+import com.qmth.themis.business.constant.SystemConstant;
|
|
|
+import com.qmth.themis.business.entity.TEExam;
|
|
|
+import com.qmth.themis.business.entity.TEExamActivity;
|
|
|
+import com.qmth.themis.business.entity.TMRocketMessage;
|
|
|
+import com.qmth.themis.business.service.TMRocketMessageService;
|
|
|
+import com.qmth.themis.business.util.JacksonUtil;
|
|
|
+import com.qmth.themis.business.util.RedisUtil;
|
|
|
+import com.qmth.themis.common.contanst.Constants;
|
|
|
+import com.qmth.themis.mq.dto.MqDto;
|
|
|
+import com.qmth.themis.task.config.DictionaryConfig;
|
|
|
+import com.qmth.themis.task.quartz.ExamActivityJob;
|
|
|
+import com.qmth.themis.task.service.QuartzService;
|
|
|
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
|
|
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
|
|
+import org.apache.rocketmq.common.message.Message;
|
|
|
+import org.apache.rocketmq.common.message.MessageExt;
|
|
|
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
+import org.apache.rocketmq.spring.annotation.SelectorType;
|
|
|
+import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
|
+import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.util.*;
|
|
|
+
|
|
|
+
|
|
|
+ * @Description: 普通消息监听 task
|
|
|
+ * @Param:
|
|
|
+ * @return:
|
|
|
+ * @Author: wangliang
|
|
|
+ * @Date: 2020/7/2
|
|
|
+ */
|
|
|
+@Service
|
|
|
+public class RocketQuartzConsumer implements MessageListenerConcurrently {
|
|
|
+ private final static Logger log = LoggerFactory.getLogger(RocketQuartzConsumer.class);
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ RedisUtil redisUtil;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ TMRocketMessageService tmRocketMessageService;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ QuartzService quartzService;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ DictionaryConfig dictionaryConfig;
|
|
|
+
|
|
|
+
|
|
|
+ * 导入任务监听
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
+ MqDto mqDto = null;
|
|
|
+ try {
|
|
|
+ long threadId = Thread.currentThread().getId();
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
+ Gson gson = new Gson();
|
|
|
+ for (MessageExt messageExt : msgs) {
|
|
|
+ log.info(":{}-:{} quartz Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
+ mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
+ log.info(":{}-:{} quartz Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
+ int reconsumeTime = messageExt.getReconsumeTimes();
|
|
|
+ if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
|
|
|
+
|
|
|
+ mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
|
|
|
+ TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
+ tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
+ redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
+ } else {
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.QUARTZLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
+ Map<String, Object> tranMap = mqDto.getProperties();
|
|
|
+ String oper = String.valueOf(tranMap.get("oper"));
|
|
|
+ Object o = JacksonUtil.parseJson(tranMap.get("exam"));
|
|
|
+ TEExam teExam = JSONObject.toJavaObject(JSONObject.parseObject(String.valueOf(o)), TEExam.class);
|
|
|
+ mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
+ JSONArray jsonArray = (JSONArray) JSONArray.parse(String.valueOf(mqDto.getBody()));
|
|
|
+ for (int i = 0; i < jsonArray.size(); i++) {
|
|
|
+ TEExamActivity teExamActivity = JSONObject.toJavaObject((JSON) jsonArray.get(i), TEExamActivity.class);
|
|
|
+ if (Objects.equals("delete", oper)) {
|
|
|
+ quartzService.deleteJob(teExamActivity.getCode(), dictionaryConfig.quartzConfigDomain().getExamActivityJobGroupName());
|
|
|
+ } else {
|
|
|
+ Integer forceFinish = teExam.getForceFinish();
|
|
|
+ Date startTime = teExamActivity.getStartTime();
|
|
|
+ Date finishTime = teExamActivity.getFinishTime();
|
|
|
+ if (Objects.nonNull(forceFinish) && forceFinish.intValue() == 1) {
|
|
|
+ Integer activityMaxDurationSeconds = Objects.nonNull(teExamActivity.getMaxDurationSeconds()) ? teExamActivity.getMaxDurationSeconds() : null;
|
|
|
+ Integer maxDurationSeconds = Objects.nonNull(teExam.getMaxDurationSeconds()) ? teExam.getMaxDurationSeconds() : null;
|
|
|
+ Integer finalMaxDurationSeconds = Objects.nonNull(activityMaxDurationSeconds) ? activityMaxDurationSeconds : maxDurationSeconds;
|
|
|
+ Calendar calendar = Calendar.getInstance();
|
|
|
+ if (Objects.nonNull(finalMaxDurationSeconds)) {
|
|
|
+ calendar.setTime(startTime);
|
|
|
+ calendar.add(Calendar.SECOND, activityMaxDurationSeconds.intValue());
|
|
|
+ } else {
|
|
|
+ calendar.setTime(finishTime);
|
|
|
+ }
|
|
|
+
|
|
|
+ int year = calendar.get(Calendar.YEAR);
|
|
|
+ int month = calendar.get(Calendar.MONTH) + 1;
|
|
|
+ int day = calendar.get(Calendar.DATE);
|
|
|
+ int hour = calendar.get(Calendar.HOUR_OF_DAY);
|
|
|
+ int minute = calendar.get(Calendar.MINUTE);
|
|
|
+ int second = calendar.get(Calendar.SECOND);
|
|
|
+
|
|
|
+ String cron = second + " " + (minute + 1) + " " + hour + " " + day + " " + month + " ? " + year;
|
|
|
+ log.info("cron:{}", cron);
|
|
|
+ String activityCode = teExamActivity.getCode();
|
|
|
+
|
|
|
+ Map mapJob = new HashMap();
|
|
|
+ mapJob.put("name", activityCode);
|
|
|
+ quartzService.deleteJob(activityCode, dictionaryConfig.quartzConfigDomain().getExamActivityJobGroupName());
|
|
|
+ quartzService.addJob(ExamActivityJob.class, activityCode, dictionaryConfig.quartzConfigDomain().getExamActivityJobGroupName(), cron, mapJob);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
+ tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
+ redisUtil.delete(SystemConstant.QUARTZLOG_TOPIC_BUFFER_LIST + mqDto.getId());
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
+ } else {
|
|
|
+ log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
|
|
+ } finally {
|
|
|
+ if (Objects.nonNull(mqDto)) {
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * quartz
|
|
|
+ */
|
|
|
+ @Service
|
|
|
+ @RocketMQMessageListener(consumerGroup = "${mq.config.quartzConsumerExamActivityGroup}", topic = "${mq.config.quartzTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.quartzTopicExamActivityTag}")
|
|
|
+ public class taskConsumerQuartzStudent implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onMessage(Message message) {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
+ defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);
|
|
|
+ defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
+ defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);
|
|
|
+ defaultMQPushConsumer.registerMessageListener(RocketQuartzConsumer.this::consumeMessage);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|