|
@@ -1,176 +0,0 @@
|
|
-package com.qmth.themis.task.listener;
|
|
|
|
-
|
|
|
|
-import com.alibaba.fastjson.JSON;
|
|
|
|
-import com.alibaba.fastjson.JSONArray;
|
|
|
|
-import com.alibaba.fastjson.JSONObject;
|
|
|
|
-import com.google.gson.Gson;
|
|
|
|
-import com.qmth.themis.business.constant.SpringContextHolder;
|
|
|
|
-import com.qmth.themis.business.constant.SystemConstant;
|
|
|
|
-import com.qmth.themis.business.entity.TEExam;
|
|
|
|
-import com.qmth.themis.business.entity.TEExamActivity;
|
|
|
|
-import com.qmth.themis.business.entity.TMRocketMessage;
|
|
|
|
-import com.qmth.themis.business.service.TMRocketMessageService;
|
|
|
|
-import com.qmth.themis.business.util.JacksonUtil;
|
|
|
|
-import com.qmth.themis.business.util.RedisUtil;
|
|
|
|
-import com.qmth.themis.common.contanst.Constants;
|
|
|
|
-import com.qmth.themis.common.exception.BusinessException;
|
|
|
|
-import com.qmth.themis.mq.dto.MqDto;
|
|
|
|
-import com.qmth.themis.task.config.DictionaryConfig;
|
|
|
|
-import com.qmth.themis.task.quartz.ExamActivityJob;
|
|
|
|
-import com.qmth.themis.task.service.QuartzService;
|
|
|
|
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
|
|
-import org.apache.rocketmq.client.consumer.listener.*;
|
|
|
|
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
|
|
|
-import org.apache.rocketmq.common.message.Message;
|
|
|
|
-import org.apache.rocketmq.common.message.MessageExt;
|
|
|
|
-import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
|
-import org.apache.rocketmq.spring.annotation.SelectorType;
|
|
|
|
-import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
|
|
-import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
|
|
|
|
-import org.slf4j.Logger;
|
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
|
-import org.springframework.stereotype.Service;
|
|
|
|
-import org.springframework.transaction.annotation.Transactional;
|
|
|
|
-
|
|
|
|
-import javax.annotation.Resource;
|
|
|
|
-import java.util.*;
|
|
|
|
-
|
|
|
|
-/**
|
|
|
|
- * @Description: 普通消息监听 quartz
|
|
|
|
- * @Param:
|
|
|
|
- * @return:
|
|
|
|
- * @Author: wangliang
|
|
|
|
- * @Date: 2020/7/2
|
|
|
|
- */
|
|
|
|
-@Service
|
|
|
|
-public class RocketQuartzConsumer implements MessageListenerOrderly {
|
|
|
|
- private final static Logger log = LoggerFactory.getLogger(RocketQuartzConsumer.class);
|
|
|
|
-
|
|
|
|
- @Resource
|
|
|
|
- RedisUtil redisUtil;
|
|
|
|
-
|
|
|
|
- @Resource
|
|
|
|
- TMRocketMessageService tmRocketMessageService;
|
|
|
|
-
|
|
|
|
- @Resource
|
|
|
|
- QuartzService quartzService;
|
|
|
|
-
|
|
|
|
- @Resource
|
|
|
|
- DictionaryConfig dictionaryConfig;
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 考试场次监听
|
|
|
|
- */
|
|
|
|
- @Override
|
|
|
|
- @Transactional
|
|
|
|
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
|
|
|
|
- MqDto mqDto = null;
|
|
|
|
- try {
|
|
|
|
- long threadId = Thread.currentThread().getId();
|
|
|
|
- String threadName = Thread.currentThread().getName();
|
|
|
|
- Gson gson = new Gson();
|
|
|
|
- for (MessageExt messageExt : msgs) {
|
|
|
|
- log.info(":{}-:{} quartz Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
|
- mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
|
- log.info(":{}-:{} quartz Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
|
- int reconsumeTime = messageExt.getReconsumeTimes();
|
|
|
|
- if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
|
|
|
|
- //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
|
|
|
|
- mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
|
|
|
|
- TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
|
- tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
|
- redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
|
- } else {
|
|
|
|
- if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
|
- Map<String, Object> tranMap = mqDto.getProperties();
|
|
|
|
- String oper = String.valueOf(tranMap.get("oper"));
|
|
|
|
- Object o = JacksonUtil.parseJson(tranMap.get("exam"));
|
|
|
|
- TEExam teExam = JSONObject.toJavaObject(JSONObject.parseObject(String.valueOf(o)), TEExam.class);
|
|
|
|
- mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
|
- JSONArray jsonArray = (JSONArray) JSONArray.parse(String.valueOf(mqDto.getBody()));
|
|
|
|
- for (int i = 0; i < jsonArray.size(); i++) {
|
|
|
|
- TEExamActivity teExamActivity = JSONObject.toJavaObject((JSON) jsonArray.get(i), TEExamActivity.class);
|
|
|
|
- if (Objects.equals("delete", oper)) {
|
|
|
|
- quartzService.deleteJob(teExamActivity.getCode(), dictionaryConfig.quartzConfigDomain().getExamActivityJobGroupName());
|
|
|
|
- } else {
|
|
|
|
- Integer forceFinish = teExam.getForceFinish();
|
|
|
|
- Date startTime = teExamActivity.getStartTime();
|
|
|
|
- Date finishTime = teExamActivity.getFinishTime();
|
|
|
|
- if (Objects.nonNull(forceFinish) && forceFinish.intValue() == 1) {//强制收卷
|
|
|
|
- Integer activityMaxDurationSeconds = Objects.nonNull(teExamActivity.getMaxDurationSeconds()) ? teExamActivity.getMaxDurationSeconds() : null;
|
|
|
|
- Integer maxDurationSeconds = Objects.nonNull(teExam.getMaxDurationSeconds()) ? teExam.getMaxDurationSeconds() : null;
|
|
|
|
- Integer finalMaxDurationSeconds = Objects.nonNull(activityMaxDurationSeconds) ? activityMaxDurationSeconds : maxDurationSeconds;
|
|
|
|
- Calendar calendar = Calendar.getInstance();
|
|
|
|
- if (Objects.nonNull(finalMaxDurationSeconds)) {
|
|
|
|
- calendar.setTime(startTime);
|
|
|
|
- calendar.add(Calendar.SECOND, activityMaxDurationSeconds.intValue());
|
|
|
|
- } else {
|
|
|
|
- calendar.setTime(finishTime);
|
|
|
|
- }
|
|
|
|
- if (calendar.getTime().getTime() > System.currentTimeMillis()) {
|
|
|
|
-// calendar.setTime(new Date());
|
|
|
|
- int year = calendar.get(Calendar.YEAR);//获取年份
|
|
|
|
- int month = calendar.get(Calendar.MONTH) + 1;//获取月份
|
|
|
|
- int day = calendar.get(Calendar.DATE);//获取日
|
|
|
|
- int hour = calendar.get(Calendar.HOUR_OF_DAY);//小时
|
|
|
|
- int minute = calendar.get(Calendar.MINUTE);//分
|
|
|
|
- int second = calendar.get(Calendar.SECOND);//秒
|
|
|
|
-// String cron = (second + 20) + " " + (minute) + " " + hour + " " + day + " " + month + " ? " + year;
|
|
|
|
- String cron = second + " " + (minute + 1) + " " + hour + " " + day + " " + month + " ? " + year;
|
|
|
|
- log.info("cron:{}", cron);
|
|
|
|
- String activityCode = teExamActivity.getCode();
|
|
|
|
- //执行一次性延时任务
|
|
|
|
- Map mapJob = new HashMap();
|
|
|
|
- mapJob.put("name", activityCode);
|
|
|
|
- quartzService.deleteJob(activityCode, dictionaryConfig.quartzConfigDomain().getExamActivityJobGroupName());
|
|
|
|
- quartzService.addJob(SpringContextHolder.getBean(ExamActivityJob.class).getClass(), activityCode, dictionaryConfig.quartzConfigDomain().getExamActivityJobGroupName(), cron, mapJob);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
|
- tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
|
- redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
|
- return ConsumeOrderlyStatus.SUCCESS;
|
|
|
|
- } else {
|
|
|
|
- log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
|
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- e.printStackTrace();
|
|
|
|
- if (e instanceof BusinessException) {
|
|
|
|
- throw new BusinessException(e.getMessage());
|
|
|
|
- } else {
|
|
|
|
- throw new RuntimeException(e);
|
|
|
|
- }
|
|
|
|
- } finally {
|
|
|
|
- if (Objects.nonNull(mqDto)) {
|
|
|
|
- redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return ConsumeOrderlyStatus.SUCCESS;//成功
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * quartz
|
|
|
|
- */
|
|
|
|
- @Service
|
|
|
|
- @RocketMQMessageListener(consumerGroup = "${mq.config.quartzConsumerExamActivityGroup}", topic = "${mq.config.quartzTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.quartzTopicExamActivityTag}")
|
|
|
|
- public class taskConsumerQuartzStudent implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void onMessage(Message message) {
|
|
|
|
- //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
|
- defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
|
- defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
|
- defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
|
- defaultMQPushConsumer.registerMessageListener(RocketQuartzConsumer.this::consumeMessage);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-}
|
|
|