|
@@ -0,0 +1,131 @@
|
|
|
+package com.qmth.themis.business.service.impl;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.qmth.themis.business.constant.SystemConstant;
|
|
|
+import com.qmth.themis.business.service.ProducerServer;
|
|
|
+import com.qmth.themis.common.exception.BusinessException;
|
|
|
+import com.qmth.themis.common.util.Result;
|
|
|
+import com.qmth.themis.common.util.ResultUtil;
|
|
|
+import org.apache.rocketmq.client.producer.SendCallback;
|
|
|
+import org.apache.rocketmq.client.producer.SendResult;
|
|
|
+import org.apache.rocketmq.client.producer.TransactionSendResult;
|
|
|
+import org.apache.rocketmq.common.message.Message;
|
|
|
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.messaging.support.MessageBuilder;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Description: mq 服务实现类
|
|
|
+ * @Param:
|
|
|
+ * @return:
|
|
|
+ * @Author: wangliang
|
|
|
+ * @Date: 2020/6/28
|
|
|
+ */
|
|
|
+@Service
|
|
|
+public class ProducerServerImpl implements ProducerServer {
|
|
|
+ private final static Logger log = LoggerFactory.getLogger(ProducerServerImpl.class);
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ RocketMQTemplate rocketMQTemplate;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步消息
|
|
|
+ *
|
|
|
+ * @param topic
|
|
|
+ * @param tags
|
|
|
+ * @param msg
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public Result syncMsg(String topic, String tags, Object msg) {
|
|
|
+ log.info("syncMsg topic:{},tags:{},msg:{}", topic, tags, JSONObject.toJSONString(msg));
|
|
|
+ Message message = new Message();
|
|
|
+ message.setTopic(topic);
|
|
|
+ message.setBody(JSONObject.toJSONString(msg).getBytes());
|
|
|
+ message.setTags(tags);
|
|
|
+ message.putUserProperty(tags, JSONObject.toJSONString(msg));
|
|
|
+ SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
|
|
|
+ // 同步消息发送成功会有一个返回值,我们可以用这个返回值进行判断和获取一些信息
|
|
|
+ log.info("sendResult:{}", JSONObject.toJSONString(sendResult));
|
|
|
+ return ResultUtil.ok(SystemConstant.SUCCESS);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 异步消息
|
|
|
+ *
|
|
|
+ * @param topic
|
|
|
+ * @param tags
|
|
|
+ * @param msg
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public Result asyncMsg(String topic, String tags, Object msg) {
|
|
|
+ log.info("asyncMsg topic:{},tags:{},msg:{}", topic, tags, JSONObject.toJSONString(msg));
|
|
|
+ Message message = new Message();
|
|
|
+ message.setTopic(topic);
|
|
|
+ message.setBody(JSONObject.toJSONString(msg).getBytes());
|
|
|
+ message.setTags(tags);
|
|
|
+ message.putUserProperty(tags, JSONObject.toJSONString(msg));
|
|
|
+ rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
|
|
|
+ @Override
|
|
|
+ public void onSuccess(SendResult sendResult) {
|
|
|
+ // 成功回调
|
|
|
+ log.info("sendResult:{}", JSONObject.toJSONString(sendResult));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onException(Throwable throwable) {
|
|
|
+ // 失败回调
|
|
|
+ throw new BusinessException(throwable.getMessage());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return ResultUtil.ok(SystemConstant.SUCCESS);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 单向消息
|
|
|
+ *
|
|
|
+ * @param topic
|
|
|
+ * @param tags
|
|
|
+ * @param msg
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public Result sendOneWay(String topic, String tags, Object msg) {
|
|
|
+ log.info("sendOneWay topic:{},tags:{},msg:{}", topic, tags, JSONObject.toJSONString(msg));
|
|
|
+ Message message = new Message();
|
|
|
+ message.setTopic(topic);
|
|
|
+ message.setBody(JSONObject.toJSONString(msg).getBytes());
|
|
|
+ message.setTags(tags);
|
|
|
+ message.putUserProperty(tags, JSONObject.toJSONString(msg));
|
|
|
+ rocketMQTemplate.sendOneWay(topic + ":" + tags, message);
|
|
|
+ return ResultUtil.ok(SystemConstant.SUCCESS);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 事务消息
|
|
|
+ *
|
|
|
+ * @param topic
|
|
|
+ * @param tags
|
|
|
+ * @param msg
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public Result sendMsgTran(String topic, String tags, Object msg) {
|
|
|
+ log.info("sendMsgTran topic:{},tags:{},msg:{}", topic, tags, JSONObject.toJSONString(msg));
|
|
|
+ Message message = new Message();
|
|
|
+ message.setTopic(topic);
|
|
|
+ message.setBody(JSONObject.toJSONString(msg).getBytes());
|
|
|
+ message.setTags(tags);
|
|
|
+ message.putUserProperty(tags, JSONObject.toJSONString(msg));
|
|
|
+ org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(message).build();
|
|
|
+ //发送事务消息
|
|
|
+ TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(topic, messageTran, null);
|
|
|
+ log.info("transactionSendResult:{}", JSONObject.toJSONString(transactionSendResult));
|
|
|
+ return ResultUtil.ok(SystemConstant.SUCCESS);
|
|
|
+ }
|
|
|
+}
|