Browse Source

构建mq延时消息公用

wangliang 4 years ago
parent
commit
422dfffa52

+ 10 - 0
themis-business/src/main/java/com/qmth/themis/business/service/MqDtoService.java

@@ -2,6 +2,8 @@ package com.qmth.themis.business.service;
 
 import com.qmth.themis.business.dto.MqDto;
 
+import java.util.Map;
+
 /**
  * @Description: mqdto 服务类
  * @Param:
@@ -42,4 +44,12 @@ public interface MqDtoService {
      * @return
      */
     public MqDto assembleSendTranMsg(MqDto mqDto);
+
+    /**
+     * 构建mq延时消息
+     *
+     * @param level
+     * @return
+     */
+    public Map<String, Object> buildMqDelayMsg(String level);
 }

+ 3 - 15
themis-business/src/main/java/com/qmth/themis/business/service/impl/CommonServiceImpl.java

@@ -16,9 +16,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -60,20 +58,10 @@ public class CommonServiceImpl implements CommonService {
             if (score == null) {
                 log.info("score is null ,recordId:" + recordId);
                 //算分未完成的 发送10秒延迟消息
-                Map<String, Object> transMap = new HashMap<String, Object>();
-                transMap.put("recordId", recordId);
-                String level = "10s";
-                Integer time = SystemConstant.mqDelayLevel.get(level);
-                LocalDateTime dt = LocalDateTime.now();
-                dt = dt.plusSeconds(Long.parseLong(level.replace("s", "")));
-                Map<String, Object> propMap = new HashMap<String, Object>();
-                propMap.put("timeOut", time);
-                propMap.put("mqExecTime", dt.toInstant(ZoneOffset.of("+8")).toEpochMilli());
+                Map<String, Object> propMap = mqDtoService.buildMqDelayMsg("10s");
                 MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.EXAM_RECORD_PERSISTED.name(),
-                        transMap, MqTagEnum.EXAM_RECORD_PERSISTED, recordId.toString(), propMap, recordId.toString());
-
+                        Collections.singletonMap("recordId", recordId), MqTagEnum.EXAM_RECORD_PERSISTED, recordId.toString(), propMap, recordId.toString());
                 mqDtoService.assembleSendAsyncDelayMsg(mqDto);
-
                 return;
             }
 

+ 26 - 1
themis-business/src/main/java/com/qmth/themis/business/service/impl/MqDtoServiceImpl.java

@@ -2,7 +2,6 @@ package com.qmth.themis.business.service.impl;
 
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.dto.MqDto;
-import com.qmth.themis.business.enums.MqTagEnum;
 import com.qmth.themis.business.service.MqDtoService;
 import com.qmth.themis.business.service.ProducerServer;
 import com.qmth.themis.business.util.RedisUtil;
@@ -11,6 +10,10 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -118,6 +121,28 @@ public class MqDtoServiceImpl implements MqDtoService {
         return null;
     }
 
+    /**
+     * 构建mq延时消息
+     *
+     * @param level
+     * @return
+     */
+    @Override
+    public Map<String, Object> buildMqDelayMsg(String level) {
+        Map<String, Object> tranMap = new HashMap<>();
+        //发送延时mq消息start
+        Integer time = SystemConstant.mqDelayLevel.get(level);
+        LocalDateTime dt = LocalDateTime.now();
+        if (level.contains("m")) {
+            dt = dt.plusMinutes(Long.parseLong(level.replace("m", "")));
+        } else {
+            dt = dt.plusSeconds(Long.parseLong(level.replace("s", "")));
+        }
+        tranMap.put("timeOut", time);
+        tranMap.put("mqExecTime", dt.toInstant(ZoneOffset.of("+8")).toEpochMilli());
+        return tranMap;
+    }
+
     /**
      * 设置mq buffer type
      *

+ 2 - 14
themis-business/src/main/java/com/qmth/themis/business/service/impl/TEExamServiceImpl.java

@@ -42,8 +42,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.math.BigDecimal;
 import java.text.SimpleDateFormat;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
 import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -1146,19 +1144,9 @@ public class TEExamServiceImpl extends ServiceImpl<TEExamMapper, TEExam> impleme
         if (!ExamRecordStatusEnum.FINISHED.equals(status)) {
             return;
         }
-
-        Map<String, Object> transMap = new HashMap<String, Object>();
-        transMap.put("recordId", recordId);
-        String level = "10s";
-        Integer time = SystemConstant.mqDelayLevel.get(level);
-        LocalDateTime dt = LocalDateTime.now();
-        dt = dt.plusSeconds(Long.parseLong(level.replace("s", "")));
-        Map<String, Object> propMap = new HashMap<String, Object>();
-        propMap.put("timeOut", time);
-        propMap.put("mqExecTime", dt.toInstant(ZoneOffset.of("+8")).toEpochMilli());
-        MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.EXAM_RECORD_PERSISTED.name(), transMap,
+        Map<String, Object> propMap = mqDtoService.buildMqDelayMsg("10s");
+        MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.EXAM_RECORD_PERSISTED.name(), Collections.singletonMap("recordId", recordId),
                 MqTagEnum.EXAM_RECORD_PERSISTED, recordId.toString(), propMap, recordId.toString());
-
         mqDtoService.assembleSendAsyncDelayMsg(mqDto);
     }
 

+ 1 - 9
themis-business/src/main/java/com/qmth/themis/business/service/impl/TOeExamRecordServiceImpl.java

@@ -331,17 +331,9 @@ public class TOeExamRecordServiceImpl extends ServiceImpl<TOeExamRecordMapper, T
         transMap.put("mainNumber", mainNumber);
         transMap.put("subNumber", subNumber);
         transMap.put("subIndex", subIndex);
-
-        String level = "1s";
-        Integer time = SystemConstant.mqDelayLevel.get(level);
-        LocalDateTime dt = LocalDateTime.now();
-        dt = dt.plusSeconds(Long.parseLong(level.replace("s", "")));
-        Map<String, Object> propMap = new HashMap<String, Object>();
-        propMap.put("timeOut", time);
-        propMap.put("mqExecTime", dt.toInstant(ZoneOffset.of("+8")).toEpochMilli());
+        Map<String, Object> propMap = mqDtoService.buildMqDelayMsg("1s");
         MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.CALCULATE_OBJECTIVE_SCORE.name(),
                 transMap, MqTagEnum.CALCULATE_OBJECTIVE_SCORE, recordId.toString(), propMap, recordId.toString());
-
         mqDtoService.assembleSendAsyncDelayMsg(mqDto);
     }
 

+ 1 - 10
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketOeServer.java

@@ -35,8 +35,6 @@ import javax.websocket.server.ServerEndpoint;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -130,15 +128,8 @@ public class WebSocketOeServer implements Concurrently {
                     log.info("超时退出");
                     //发送延时mq消息start
                     MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
+                    tranMap = mqDtoService.buildMqDelayMsg("2m");
                     MqUtil mqUtil = SpringContextHolder.getBean(MqUtil.class);
-                    String level = "2m";
-//                    String level = "30s";
-                    Integer time = SystemConstant.mqDelayLevel.get(level);
-                    LocalDateTime dt = LocalDateTime.now();
-                    dt = dt.plusMinutes(Long.parseLong(level.replace("m", "")));
-//                    dt = dt.plusSeconds(Long.parseLong(level.replace("s", "")));
-                    tranMap.put("timeOut", time);
-                    tranMap.put("mqExecTime", dt.toInstant(ZoneOffset.of("+8")).toEpochMilli());
                     MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_UN_NORMAL.name(), MqTagEnum.OE_UN_NORMAL, MqTagEnum.OE_UN_NORMAL, String.valueOf(this.recordId), this.tranMap, this.sessionId);
                     mqDtoService.assembleSendAsyncDelayMsg(mqDto);
                     //发送延时mq消息end