|
@@ -1,7 +1,7 @@
|
|
package com.qmth.themis.mq.service.impl;
|
|
package com.qmth.themis.mq.service.impl;
|
|
|
|
|
|
-import com.alibaba.fastjson.JSONObject;
|
|
|
|
import com.qmth.themis.business.constant.SystemConstant;
|
|
import com.qmth.themis.business.constant.SystemConstant;
|
|
|
|
+import com.qmth.themis.business.util.JacksonUtil;
|
|
import com.qmth.themis.common.exception.BusinessException;
|
|
import com.qmth.themis.common.exception.BusinessException;
|
|
import com.qmth.themis.common.util.Result;
|
|
import com.qmth.themis.common.util.Result;
|
|
import com.qmth.themis.common.util.ResultUtil;
|
|
import com.qmth.themis.common.util.ResultUtil;
|
|
@@ -41,10 +41,10 @@ public class ProducerServerImpl implements ProducerServer {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public Result syncMsg(MqDto mqDto) {
|
|
public Result syncMsg(MqDto mqDto) {
|
|
- log.info("syncMsg mqDto:{}", JSONObject.toJSONString(mqDto));
|
|
|
|
|
|
+ log.info("syncMsg mqDto:{}", JacksonUtil.parseJson(mqDto));
|
|
SendResult sendResult = rocketMQTemplate.syncSend(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto);
|
|
SendResult sendResult = rocketMQTemplate.syncSend(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto);
|
|
// 同步消息发送成功会有一个返回值,我们可以用这个返回值进行判断和获取一些信息
|
|
// 同步消息发送成功会有一个返回值,我们可以用这个返回值进行判断和获取一些信息
|
|
- log.info("sendResult:{}", JSONObject.toJSONString(sendResult));
|
|
|
|
|
|
+ log.info("sendResult:{}", JacksonUtil.parseJson(sendResult));
|
|
return ResultUtil.ok(SystemConstant.SUCCESS);
|
|
return ResultUtil.ok(SystemConstant.SUCCESS);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -56,10 +56,10 @@ public class ProducerServerImpl implements ProducerServer {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public Result syncOrderlyMsg(MqDto mqDto) {
|
|
public Result syncOrderlyMsg(MqDto mqDto) {
|
|
- log.info("syncOrderlyMsg mqDto:{}", JSONObject.toJSONString(mqDto));
|
|
|
|
|
|
+ log.info("syncOrderlyMsg mqDto:{}", JacksonUtil.parseJson(mqDto));
|
|
SendResult sendResult = rocketMQTemplate.syncSendOrderly(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto, mqDto.getObjId());
|
|
SendResult sendResult = rocketMQTemplate.syncSendOrderly(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto, mqDto.getObjId());
|
|
// 同步顺序消息发送成功会有一个返回值,我们可以用这个返回值进行判断和获取一些信息
|
|
// 同步顺序消息发送成功会有一个返回值,我们可以用这个返回值进行判断和获取一些信息
|
|
- log.info("sendResult:{}", JSONObject.toJSONString(sendResult));
|
|
|
|
|
|
+ log.info("sendResult:{}", JacksonUtil.parseJson(sendResult));
|
|
return ResultUtil.ok(SystemConstant.SUCCESS);
|
|
return ResultUtil.ok(SystemConstant.SUCCESS);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -71,12 +71,12 @@ public class ProducerServerImpl implements ProducerServer {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public Result asyncMsg(MqDto mqDto) {
|
|
public Result asyncMsg(MqDto mqDto) {
|
|
- log.info("asyncMsg mqDto:{}", JSONObject.toJSONString(mqDto));
|
|
|
|
|
|
+ log.info("asyncMsg mqDto:{}", JacksonUtil.parseJson(mqDto));
|
|
rocketMQTemplate.asyncSend(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto, new SendCallback() {
|
|
rocketMQTemplate.asyncSend(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto, new SendCallback() {
|
|
@Override
|
|
@Override
|
|
public void onSuccess(SendResult sendResult) {
|
|
public void onSuccess(SendResult sendResult) {
|
|
// 成功回调
|
|
// 成功回调
|
|
- log.info("sendResult:{}", JSONObject.toJSONString(sendResult));
|
|
|
|
|
|
+ log.info("sendResult:{}", JacksonUtil.parseJson(sendResult));
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -96,12 +96,12 @@ public class ProducerServerImpl implements ProducerServer {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public Result asyncOrderlyMsg(MqDto mqDto) {
|
|
public Result asyncOrderlyMsg(MqDto mqDto) {
|
|
- log.info("asyncOrderlyMsg mqDto:{}", JSONObject.toJSONString(mqDto));
|
|
|
|
|
|
+ log.info("asyncOrderlyMsg mqDto:{}", JacksonUtil.parseJson(mqDto));
|
|
rocketMQTemplate.asyncSendOrderly(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto, mqDto.getObjId(), new SendCallback() {
|
|
rocketMQTemplate.asyncSendOrderly(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto, mqDto.getObjId(), new SendCallback() {
|
|
@Override
|
|
@Override
|
|
public void onSuccess(SendResult sendResult) {
|
|
public void onSuccess(SendResult sendResult) {
|
|
// 成功回调
|
|
// 成功回调
|
|
- log.info("sendResult:{}", JSONObject.toJSONString(sendResult));
|
|
|
|
|
|
+ log.info("sendResult:{}", JacksonUtil.parseJson(sendResult));
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -121,7 +121,7 @@ public class ProducerServerImpl implements ProducerServer {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public Result sendOneWay(MqDto mqDto) {
|
|
public Result sendOneWay(MqDto mqDto) {
|
|
- log.info("sendOneWay mqDto:{}", JSONObject.toJSONString(mqDto));
|
|
|
|
|
|
+ log.info("sendOneWay mqDto:{}", JacksonUtil.parseJson(mqDto));
|
|
rocketMQTemplate.sendOneWay(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto);
|
|
rocketMQTemplate.sendOneWay(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto);
|
|
return ResultUtil.ok(SystemConstant.SUCCESS);
|
|
return ResultUtil.ok(SystemConstant.SUCCESS);
|
|
}
|
|
}
|
|
@@ -134,7 +134,7 @@ public class ProducerServerImpl implements ProducerServer {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public Result sendOneWayOrderly(MqDto mqDto) {
|
|
public Result sendOneWayOrderly(MqDto mqDto) {
|
|
- log.info("sendOneWayOrderly mqDto:{}", JSONObject.toJSONString(mqDto));
|
|
|
|
|
|
+ log.info("sendOneWayOrderly mqDto:{}", JacksonUtil.parseJson(mqDto));
|
|
rocketMQTemplate.sendOneWayOrderly(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto, mqDto.getObjId());
|
|
rocketMQTemplate.sendOneWayOrderly(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto, mqDto.getObjId());
|
|
return ResultUtil.ok(SystemConstant.SUCCESS);
|
|
return ResultUtil.ok(SystemConstant.SUCCESS);
|
|
}
|
|
}
|
|
@@ -147,12 +147,12 @@ public class ProducerServerImpl implements ProducerServer {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public Result sendMsgTran(MqDto mqDto) {
|
|
public Result sendMsgTran(MqDto mqDto) {
|
|
- log.info("sendMsgTran mqDto:{}", JSONObject.toJSONString(mqDto));
|
|
|
|
|
|
+ log.info("sendMsgTran mqDto:{}", JacksonUtil.parseJson(mqDto));
|
|
Message message = assembleMessage(mqDto);
|
|
Message message = assembleMessage(mqDto);
|
|
org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(message).build();
|
|
org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(message).build();
|
|
//发送事务消息
|
|
//发送事务消息
|
|
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(mqDto.getTopic() + ":" + mqDto.getTag(), messageTran, null);
|
|
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(mqDto.getTopic() + ":" + mqDto.getTag(), messageTran, null);
|
|
- log.info("transactionSendResult:{}", JSONObject.toJSONString(transactionSendResult));
|
|
|
|
|
|
+ log.info("transactionSendResult:{}", JacksonUtil.parseJson(transactionSendResult));
|
|
return ResultUtil.ok(SystemConstant.SUCCESS);
|
|
return ResultUtil.ok(SystemConstant.SUCCESS);
|
|
}
|
|
}
|
|
|
|
|