瀏覽代碼

延迟消息结构修改

xiatian 4 年之前
父節點
當前提交
e8616c931b

+ 25 - 24
themis-business/src/main/java/com/qmth/themis/business/service/impl/ProducerServerImpl.java

@@ -1,24 +1,25 @@
 package com.qmth.themis.business.service.impl;
 
-import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.util.JacksonUtil;
-import com.qmth.themis.common.exception.BusinessException;
-import com.qmth.themis.common.util.Result;
-import com.qmth.themis.common.util.ResultUtil;
-import com.qmth.themis.business.dto.MqDto;
-import com.qmth.themis.business.service.ProducerServer;
+import java.util.Collections;
+
+import javax.annotation.Resource;
+
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.TransactionSendResult;
-import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.messaging.support.MessageBuilder;
 import org.springframework.stereotype.Service;
 
-import javax.annotation.Resource;
-import java.util.Collections;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.dto.MqDto;
+import com.qmth.themis.business.service.ProducerServer;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.common.exception.BusinessException;
+import com.qmth.themis.common.util.Result;
+import com.qmth.themis.common.util.ResultUtil;
 
 /**
  * @Description: mq 服务实现类
@@ -73,8 +74,8 @@ public class ProducerServerImpl implements ProducerServer {
     @Override
     public Result syncDelayMsg(MqDto mqDto) {
         log.info("syncDelayMsg mqDto:{}", JacksonUtil.parseJson(mqDto));
-        Message message = assembleMessage(mqDto);
-        org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(message).build();
+//        Message message = assembleMessage(mqDto);
+        org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(mqDto).build();
         rocketMQTemplate.syncSend(mqDto.getTopic() + ":" + mqDto.getTag(), messageTran, SystemConstant.MESSAGE_TIMEOUT, Integer.parseInt(String.valueOf(mqDto.getProperties().get("timeOut"))));
         return ResultUtil.ok(Collections.singletonMap("success", true));
     }
@@ -138,8 +139,8 @@ public class ProducerServerImpl implements ProducerServer {
     @Override
     public Result asyncDelayMsg(MqDto mqDto) {
         log.info("asyncDelayMsg mqDto:{}", JacksonUtil.parseJson(mqDto));
-        Message message = assembleMessage(mqDto);
-        org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(message).build();
+//        Message message = assembleMessage(mqDto);
+        org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(mqDto).build();
         rocketMQTemplate.asyncSend(mqDto.getTopic() + ":" + mqDto.getTag(), messageTran, new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
@@ -191,8 +192,8 @@ public class ProducerServerImpl implements ProducerServer {
     @Override
     public Result sendMsgTran(MqDto mqDto) {
         log.info("sendMsgTran mqDto:{}", JacksonUtil.parseJson(mqDto));
-        Message message = assembleMessage(mqDto);
-        org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(message).build();
+//        Message message = assembleMessage(mqDto);
+        org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(mqDto).build();
         //发送事务消息
         TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(mqDto.getTopic() + ":" + mqDto.getTag(), messageTran, null);
         log.info("transactionSendResult:{}", JacksonUtil.parseJson(transactionSendResult));
@@ -205,12 +206,12 @@ public class ProducerServerImpl implements ProducerServer {
      * @param mqDto
      * @return
      */
-    public Message assembleMessage(MqDto mqDto) {
-        Message message = new Message();
-        message.setTopic(mqDto.getTopic());
-        message.setBody(String.valueOf(mqDto.getBody()).getBytes());
-        message.setTags(mqDto.getTag());
-        message.putUserProperty(SystemConstant.MQDTO_OBJ, JacksonUtil.parseJson(mqDto));
-        return message;
-    }
+//    public Message assembleMessage(MqDto mqDto) {
+//        Message message = new Message();
+//        message.setTopic(mqDto.getTopic());
+//        message.setBody(String.valueOf(mqDto.getBody()).getBytes());
+//        message.setTags(mqDto.getTag());
+//        message.putUserProperty(SystemConstant.MQDTO_OBJ, JacksonUtil.parseJson(mqDto));
+//        return message;
+//    }
 }

+ 0 - 3
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqLogicServiceImpl.java

@@ -278,9 +278,6 @@ public class MqLogicServiceImpl implements MqLogicService {
             //发送移动端监考退出考试mq消息 end
         }
         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-        Map map = new HashMap();
-        map.put(SystemConstant.MQDTO_OBJ, JacksonUtil.parseJson(mqDto));
-        tmRocketMessage.setProp(JacksonUtil.parseJson(map));
         tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
         redisUtil.delete(key, mqDto.getId());

+ 2 - 5
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/ExamRecordPersistedConcurrentlyImpl.java

@@ -1,7 +1,6 @@
 package com.qmth.themis.mq.templete.impl;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 
 import javax.annotation.Resource;
@@ -13,9 +12,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
-import com.alibaba.fastjson.JSONObject;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.dto.MqDto;
+import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.contanst.Constants;
 import com.qmth.themis.mq.service.MqLogicService;
@@ -49,9 +48,7 @@ public class ExamRecordPersistedConcurrentlyImpl implements Concurrently {
                         messageExt.getReconsumeTimes());
                 String body = new String(messageExt.getBody(), Constants.CHARSET_NAME);
                 log.debug("CalculateObjectiveScore 接收到的消息 body:{}", body);
-                JSONObject jsonObject = JSONObject.parseObject(body);
-                Map properties = (Map) jsonObject.get("properties");
-                mqDto = JSONObject.toJavaObject(JSONObject.parseObject(String.valueOf(properties.get(SystemConstant.MQDTO_OBJ))), MqDto.class);
+                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                 int reconsumeTime = messageExt.getReconsumeTimes();
                 if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
                     mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);

+ 12 - 15
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/WebsocketUnNormalConcurrentlyImpl.java

@@ -1,14 +1,8 @@
 package com.qmth.themis.mq.templete.impl;
 
-import com.alibaba.fastjson.JSONObject;
-import com.qmth.themis.business.constant.SpringContextHolder;
-import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.util.JacksonUtil;
-import com.qmth.themis.business.util.RedisUtil;
-import com.qmth.themis.common.contanst.Constants;
-import com.qmth.themis.business.dto.MqDto;
-import com.qmth.themis.mq.service.MqLogicService;
-import com.qmth.themis.mq.templete.Concurrently;
+import java.util.List;
+import java.util.Objects;
+
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -16,9 +10,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import com.qmth.themis.business.constant.SpringContextHolder;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.dto.MqDto;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.mq.service.MqLogicService;
+import com.qmth.themis.mq.templete.Concurrently;
 
 /**
  * @Description: mq 延时消息监听 websocket超时退出 并行消费监听
@@ -43,9 +42,7 @@ public class WebsocketUnNormalConcurrentlyImpl implements Concurrently {
                 log.info(":{}-:{} websocket unnormal Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
                 String body = new String(messageExt.getBody(), Constants.CHARSET_NAME);
                 log.info("body:{}", body);
-                JSONObject jsonObject = JSONObject.parseObject(body);
-                Map properties = (Map) jsonObject.get("properties");
-                mqDto = JSONObject.toJavaObject(JSONObject.parseObject(String.valueOf(properties.get(SystemConstant.MQDTO_OBJ))), MqDto.class);
+                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                 log.info(":{}-:{} websocket unnormal Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
                 int reconsumeTime = messageExt.getReconsumeTimes();
                 if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {