瀏覽代碼

quartz mq

wangliang 4 年之前
父節點
當前提交
95422080a4

+ 20 - 5
themis-backend/src/main/java/com/qmth/themis/backend/api/TEExamActivityController.java

@@ -2,18 +2,23 @@ package com.qmth.themis.backend.api;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.qmth.themis.backend.config.DictionaryConfig;
 import com.qmth.themis.backend.util.ServletUtil;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.entity.TBUser;
+import com.qmth.themis.business.entity.TEExam;
 import com.qmth.themis.business.entity.TEExamActivity;
 import com.qmth.themis.business.enums.FieldUniqueEnum;
+import com.qmth.themis.business.enums.MqEnum;
 import com.qmth.themis.business.service.TEExamActivityService;
+import com.qmth.themis.business.service.TEExamService;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.common.contanst.Constants;
 import com.qmth.themis.common.enums.ExceptionResultEnum;
 import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
+import com.qmth.themis.mq.dto.MqDto;
 import com.qmth.themis.mq.service.MqDtoService;
 import io.swagger.annotations.*;
 import org.springframework.dao.DuplicateKeyException;
@@ -21,10 +26,7 @@ import org.springframework.transaction.annotation.Transactional;
 import org.springframework.web.bind.annotation.*;
 
 import javax.annotation.Resource;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
 
 /**
  * @Description: 考试场次 前端控制器
@@ -44,6 +46,12 @@ public class TEExamActivityController {
     @Resource
     MqDtoService mqDtoService;
 
+    @Resource
+    DictionaryConfig dictionaryConfig;
+
+    @Resource
+    TEExamService teExamService;
+
     @ApiOperation(value = "考试场次修改/新增接口")
     @RequestMapping(value = "/save", method = RequestMethod.POST)
     @Transactional
@@ -63,7 +71,14 @@ public class TEExamActivityController {
                 }
             });
             teExamActivityService.saveOrUpdateBatch(teExamActivityList);
-
+            TEExam teExam = teExamService.getById(teExamActivityList.get(0).getExamId());
+            //新增quartz任务,发送mq消息start
+            Map<String, Object> prop = new HashMap<>();
+            prop.put("oper", "insert");
+            prop.put("exam", teExam);
+            MqDto mqDto = new MqDto(dictionaryConfig.mqConfigDomain().getQuartzTopic(), dictionaryConfig.mqConfigDomain().getQuartzTopicExamActivityTag(), JacksonUtil.parseJson(teExamActivityList), MqEnum.QUARTZ_LOG, String.valueOf(teExam.getId()), prop, tbUser.getName());
+            mqDtoService.assembleSendOneWayMsg(mqDto);
+            //新增quartz任务,发送mq消息end
         } catch (Exception e) {
             e.printStackTrace();
             if (e instanceof DuplicateKeyException) {

+ 8 - 0
themis-exam/src/main/resources/application.properties

@@ -175,6 +175,14 @@ mq.config.websocketUnNormalConsumerGroup=${mq.config.server}-group-websocketUnNo
 mq.config.websocketUnNormalTopicOeTag=oe
 mq.config.websocketUnNormalConsumerOeGroup=${mq.config.websocketUnNormalConsumerGroup}-${mq.config.websocketUnNormalTopicOeTag}
 
+#quartz
+mq.config.quartzTopic=${mq.config.server}-topic-quartz
+mq.config.quartzConsumerGroup=${mq.config.server}-group-quartz
+
+#quartz-\u8003\u8BD5\u573A\u6B21task
+mq.config.quartzTopicExamActivityTag=examActivity
+mq.config.quartzConsumerExamActivityGroup=${mq.config.quartzConsumerGroup}-${mq.config.quartzTopicExamActivityTag}
+
 #\u963F\u91CC\u4E91OSS\u914D\u7F6E
 aliyun.oss.name=oss-cn-shenzhen.aliyuncs.com
 aliyun.oss.endpoint=http://${aliyun.oss.name}

+ 9 - 11
themis-task/src/main/java/com/qmth/themis/task/listener/RocketQuartzConsumer.java

@@ -17,9 +17,7 @@ import com.qmth.themis.task.config.DictionaryConfig;
 import com.qmth.themis.task.quartz.ExamActivityJob;
 import com.qmth.themis.task.service.QuartzService;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.*;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -35,14 +33,14 @@ import javax.annotation.Resource;
 import java.util.*;
 
 /**
- * @Description: 普通消息监听 task
+ * @Description: 普通消息监听 quartz
  * @Param:
  * @return:
  * @Author: wangliang
  * @Date: 2020/7/2
  */
 @Service
-public class RocketQuartzConsumer implements MessageListenerConcurrently {
+public class RocketQuartzConsumer implements MessageListenerOrderly {
     private final static Logger log = LoggerFactory.getLogger(RocketQuartzConsumer.class);
 
     @Resource
@@ -58,10 +56,10 @@ public class RocketQuartzConsumer implements MessageListenerConcurrently {
     DictionaryConfig dictionaryConfig;
 
     /**
-     * 导入任务监听
+     * 考试场次监听
      */
     @Override
-    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
         MqDto mqDto = null;
         try {
             long threadId = Thread.currentThread().getId();
@@ -127,22 +125,22 @@ public class RocketQuartzConsumer implements MessageListenerConcurrently {
                         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
                         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
                         redisUtil.delete(SystemConstant.QUARTZLOG_TOPIC_BUFFER_LIST + mqDto.getId());
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                        return ConsumeOrderlyStatus.SUCCESS;
                     } else {
                         log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
                     }
                 }
             }
         } catch (Exception e) {
             e.printStackTrace();
-            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
         } finally {
             if (Objects.nonNull(mqDto)) {
                 redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
             }
         }
-        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+        return ConsumeOrderlyStatus.SUCCESS;//成功
     }
 
     /**