|
@@ -1,24 +1,25 @@
|
|
|
package com.qmth.themis.business.service.impl;
|
|
|
|
|
|
-import com.qmth.themis.business.constant.SystemConstant;
|
|
|
-import com.qmth.themis.business.util.JacksonUtil;
|
|
|
-import com.qmth.themis.common.exception.BusinessException;
|
|
|
-import com.qmth.themis.common.util.Result;
|
|
|
-import com.qmth.themis.common.util.ResultUtil;
|
|
|
-import com.qmth.themis.business.dto.MqDto;
|
|
|
-import com.qmth.themis.business.service.ProducerServer;
|
|
|
+import java.util.Collections;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+
|
|
|
import org.apache.rocketmq.client.producer.SendCallback;
|
|
|
import org.apache.rocketmq.client.producer.SendResult;
|
|
|
import org.apache.rocketmq.client.producer.TransactionSendResult;
|
|
|
-import org.apache.rocketmq.common.message.Message;
|
|
|
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.messaging.support.MessageBuilder;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
-import javax.annotation.Resource;
|
|
|
-import java.util.Collections;
|
|
|
+import com.qmth.themis.business.constant.SystemConstant;
|
|
|
+import com.qmth.themis.business.dto.MqDto;
|
|
|
+import com.qmth.themis.business.service.ProducerServer;
|
|
|
+import com.qmth.themis.business.util.JacksonUtil;
|
|
|
+import com.qmth.themis.common.exception.BusinessException;
|
|
|
+import com.qmth.themis.common.util.Result;
|
|
|
+import com.qmth.themis.common.util.ResultUtil;
|
|
|
|
|
|
/**
|
|
|
* @Description: mq 服务实现类
|
|
@@ -73,8 +74,8 @@ public class ProducerServerImpl implements ProducerServer {
|
|
|
@Override
|
|
|
public Result syncDelayMsg(MqDto mqDto) {
|
|
|
log.info("syncDelayMsg mqDto:{}", JacksonUtil.parseJson(mqDto));
|
|
|
- Message message = assembleMessage(mqDto);
|
|
|
- org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(message).build();
|
|
|
+// Message message = assembleMessage(mqDto);
|
|
|
+ org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(mqDto).build();
|
|
|
rocketMQTemplate.syncSend(mqDto.getTopic() + ":" + mqDto.getTag(), messageTran, SystemConstant.MESSAGE_TIMEOUT, Integer.parseInt(String.valueOf(mqDto.getProperties().get("timeOut"))));
|
|
|
return ResultUtil.ok(Collections.singletonMap("success", true));
|
|
|
}
|
|
@@ -138,8 +139,8 @@ public class ProducerServerImpl implements ProducerServer {
|
|
|
@Override
|
|
|
public Result asyncDelayMsg(MqDto mqDto) {
|
|
|
log.info("asyncDelayMsg mqDto:{}", JacksonUtil.parseJson(mqDto));
|
|
|
- Message message = assembleMessage(mqDto);
|
|
|
- org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(message).build();
|
|
|
+// Message message = assembleMessage(mqDto);
|
|
|
+ org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(mqDto).build();
|
|
|
rocketMQTemplate.asyncSend(mqDto.getTopic() + ":" + mqDto.getTag(), messageTran, new SendCallback() {
|
|
|
@Override
|
|
|
public void onSuccess(SendResult sendResult) {
|
|
@@ -191,8 +192,8 @@ public class ProducerServerImpl implements ProducerServer {
|
|
|
@Override
|
|
|
public Result sendMsgTran(MqDto mqDto) {
|
|
|
log.info("sendMsgTran mqDto:{}", JacksonUtil.parseJson(mqDto));
|
|
|
- Message message = assembleMessage(mqDto);
|
|
|
- org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(message).build();
|
|
|
+// Message message = assembleMessage(mqDto);
|
|
|
+ org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(mqDto).build();
|
|
|
//发送事务消息
|
|
|
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(mqDto.getTopic() + ":" + mqDto.getTag(), messageTran, null);
|
|
|
log.info("transactionSendResult:{}", JacksonUtil.parseJson(transactionSendResult));
|
|
@@ -205,12 +206,12 @@ public class ProducerServerImpl implements ProducerServer {
|
|
|
* @param mqDto
|
|
|
* @return
|
|
|
*/
|
|
|
- public Message assembleMessage(MqDto mqDto) {
|
|
|
- Message message = new Message();
|
|
|
- message.setTopic(mqDto.getTopic());
|
|
|
- message.setBody(String.valueOf(mqDto.getBody()).getBytes());
|
|
|
- message.setTags(mqDto.getTag());
|
|
|
- message.putUserProperty(SystemConstant.MQDTO_OBJ, JacksonUtil.parseJson(mqDto));
|
|
|
- return message;
|
|
|
- }
|
|
|
+// public Message assembleMessage(MqDto mqDto) {
|
|
|
+// Message message = new Message();
|
|
|
+// message.setTopic(mqDto.getTopic());
|
|
|
+// message.setBody(String.valueOf(mqDto.getBody()).getBytes());
|
|
|
+// message.setTags(mqDto.getTag());
|
|
|
+// message.putUserProperty(SystemConstant.MQDTO_OBJ, JacksonUtil.parseJson(mqDto));
|
|
|
+// return message;
|
|
|
+// }
|
|
|
}
|