Pārlūkot izejas kodu

新增redis消息队列

wangliang 4 gadi atpakaļ
vecāks
revīzija
93be4c6f9f

+ 0 - 3
distributed-print/src/main/java/com/qmth/distributed/print/config/RedisListenerConfig.java

@@ -18,9 +18,6 @@ import javax.annotation.Resource;
 @Configuration
 public class RedisListenerConfig {
 
-    @Resource
-    private RedisTemplate redisTemplate;
-
     @Resource
     private RedisConnectionFactory redisConnectionFactory;
 

+ 17 - 12
distributed-print/src/main/java/com/qmth/distributed/print/config/RedisMessageListener.java

@@ -1,7 +1,10 @@
 package com.qmth.distributed.print.config;
 
+import com.google.gson.Gson;
 import com.qmth.teachcloud.common.bean.dto.MqDto;
 import com.qmth.teachcloud.common.contant.SystemConstant;
+import com.qmth.teachcloud.common.entity.TMMqMessage;
+import com.qmth.teachcloud.common.service.TMMqMessageService;
 import com.qmth.teachcloud.common.util.JacksonUtil;
 import com.qmth.teachcloud.common.util.RedisUtil;
 import org.apache.commons.text.StringEscapeUtils;
@@ -23,6 +26,9 @@ public class RedisMessageListener implements MessageListener {
     @Resource
     RedisUtil redisUtil;
 
+    @Resource
+    TMMqMessageService tmMqMessageService;
+
     @Override
     public void onMessage(Message message, byte[] bytes) {
         MqDto mqDto = null;
@@ -33,15 +39,14 @@ public class RedisMessageListener implements MessageListener {
             if (Objects.nonNull(body)) {
                 mqDto = JacksonUtil.readJson(body.substring(1, body.length() - 1), MqDto.class);
                 for (; integer.get() < mqDto.getReconsume(); integer.incrementAndGet()) {
+                    log.info("integer:{}", integer.get());
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
                             && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(),
                             SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                         //通道
-                        String channel = new String(message.getChannel(), SystemConstant.CHARSET_NAME);
-                        //渠道名称
-                        String topic = new String(bytes, SystemConstant.CHARSET_NAME);
+                        String topic = new String(message.getChannel(), SystemConstant.CHARSET_NAME);
                         mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
-                        log.info("mqDto:{},topic:{},channel:{}", JacksonUtil.parseJson(mqDto), JacksonUtil.parseJson(topic), JacksonUtil.parseJson(channel));
+                        log.info("mqDto:{},topic:{}", JacksonUtil.parseJson(mqDto), JacksonUtil.parseJson(topic));
                         break;
                     } else {
                         mqDto.setAck(SystemConstant.REDELIVERED_ACK_TYPE);
@@ -52,17 +57,17 @@ public class RedisMessageListener implements MessageListener {
         } catch (Exception e) {
             log.error("redis mq消息监听,消息消费出错", e);
             e.printStackTrace();
-            if (Objects.nonNull(mqDto)) {
-                mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
-                integer.set(3);
-            }
+            integer.set(SystemConstant.REDIS_MQ_MAX_RECONSUME);
         } finally {
-            if (integer.get() == 3 && Objects.nonNull(mqDto)) {
+            if (integer.get() == SystemConstant.REDIS_MQ_MAX_RECONSUME && Objects.nonNull(mqDto)) {//存入库
+                Gson gson = new Gson();
                 mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
-                //存入库
+                TMMqMessage tmMqMessage = gson.fromJson(gson.toJson(mqDto), TMMqMessage.class);
+                tmMqMessage.setBody(JacksonUtil.parseJson(tmMqMessage.getBody()));
+                tmMqMessageService.saveOrUpdate(tmMqMessage);
             }
-            if (Objects.nonNull(mqDto) && mqDto.getAck().intValue() == SystemConstant.STANDARD_ACK_TYPE
-                    || mqDto.getAck().intValue() == SystemConstant.POSION_ACK_TYPE) {
+            if (Objects.nonNull(mqDto) && (mqDto.getAck().intValue() == SystemConstant.STANDARD_ACK_TYPE
+                    || mqDto.getAck().intValue() == SystemConstant.POSION_ACK_TYPE)) {
                 redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
             }
         }

+ 1 - 1
teachcloud-common/src/main/java/com/qmth/teachcloud/common/bean/dto/MqDto.java

@@ -28,7 +28,7 @@ public class MqDto implements Serializable {
     private Integer ack;//ack
     private Integer sequence;//序号
     private Map<String, Object> properties;//扩展类型
-    private Integer reconsume = 3;//重试次数
+    private Integer reconsume = SystemConstant.REDIS_MQ_MAX_RECONSUME;//重试次数
 
     public MqDto() {
 

+ 1 - 0
teachcloud-common/src/main/java/com/qmth/teachcloud/common/contant/SystemConstant.java

@@ -121,6 +121,7 @@ public class SystemConstant {
      * redis mq
      */
     public static final String REDIS_LOCK_MQ_PREFIX = "redis:lock:mq:";
+    public static final int REDIS_MQ_MAX_RECONSUME = 3;
 
     /**
      * redis lock

+ 185 - 0
teachcloud-common/src/main/java/com/qmth/teachcloud/common/entity/TMMqMessage.java

@@ -0,0 +1,185 @@
+package com.qmth.teachcloud.common.entity;
+
+import com.baomidou.mybatisplus.annotation.FieldFill;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
+import com.qmth.teachcloud.common.enums.MqTagEnum;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * @Description: mq消息
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/9
+ */
+@ApiModel(value = "t_m_mq_message", description = "mq消息")
+public class TMMqMessage implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    @JsonSerialize(using = ToStringSerializer.class)
+    @ApiModelProperty(value = "主键")
+    @TableId(value = "id")
+    private String id;
+
+    @ApiModelProperty(value = "消息topic")
+    @TableField(value = "topic")
+    private String topic;
+
+    @ApiModelProperty(value = "消息tag")
+    @TableField(value = "tag")
+    private String tag;
+
+    @ApiModelProperty(value = "消息内容")
+    @TableField(value = "body")
+    private Object body;
+
+    @ApiModelProperty(value = "消息类型")
+    @TableField(value = "type")
+    private MqTagEnum type;
+
+    @ApiModelProperty(value = "关联业务id")
+    @TableField(value = "obj_id")
+    private String objId;
+
+    @ApiModelProperty(value = "关联业务名称")
+    @TableField(value = "obj_name")
+    private String objName;
+
+    @ApiModelProperty(value = "消息ack")
+    @TableField(value = "ack")
+    private Integer ack;
+
+    @ApiModelProperty(value = "消息序号")
+    @TableField(value = "sequence")
+    private Integer sequence;
+
+    @ApiModelProperty(value = "扩展类型")
+    @TableField(exist = false)
+    private Map<String, Object> properties;
+
+    @TableField(value = "properties")
+    private String prop;
+
+    @ApiModelProperty(value = "创建时间")
+    @TableField(value = "create_time", fill = FieldFill.INSERT)
+    private Long createTime;
+
+    @ApiModelProperty(value = "时间戳")
+    @TableField(value = "timestamp")
+    private Long timestamp;
+
+    public String getProp() {
+        return prop;
+    }
+
+    public void setProp(String prop) {
+        this.prop = prop;
+    }
+
+    public static long getSerialVersionUID() {
+        return serialVersionUID;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getTag() {
+        return tag;
+    }
+
+    public void setTag(String tag) {
+        this.tag = tag;
+    }
+
+    public String getObjId() {
+        return objId;
+    }
+
+    public void setObjId(String objId) {
+        this.objId = objId;
+    }
+
+    public String getObjName() {
+        return objName;
+    }
+
+    public void setObjName(String objName) {
+        this.objName = objName;
+    }
+
+    public Integer getAck() {
+        return ack;
+    }
+
+    public void setAck(Integer ack) {
+        this.ack = ack;
+    }
+
+    public Integer getSequence() {
+        return sequence;
+    }
+
+    public void setSequence(Integer sequence) {
+        this.sequence = sequence;
+    }
+
+    public Long getCreateTime() {
+        return createTime;
+    }
+
+    public void setCreateTime(Long createTime) {
+        this.createTime = createTime;
+    }
+
+    public Object getBody() {
+        return body;
+    }
+
+    public void setBody(Object body) {
+        this.body = body;
+    }
+
+    public MqTagEnum getType() {
+        return type;
+    }
+
+    public void setType(MqTagEnum type) {
+        this.type = type;
+    }
+
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Map<String, Object> properties) {
+        this.properties = properties;
+    }
+
+    public Long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(Long timestamp) {
+        this.timestamp = timestamp;
+    }
+}

+ 15 - 0
teachcloud-common/src/main/java/com/qmth/teachcloud/common/mapper/TMMqMessageMapper.java

@@ -0,0 +1,15 @@
+package com.qmth.teachcloud.common.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.qmth.teachcloud.common.entity.TMMqMessage;
+
+/**
+ * @Description: mq消息 Mapper 接口
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/9
+ */
+public interface TMMqMessageMapper extends BaseMapper<TMMqMessage> {
+
+}

+ 15 - 0
teachcloud-common/src/main/java/com/qmth/teachcloud/common/service/TMMqMessageService.java

@@ -0,0 +1,15 @@
+package com.qmth.teachcloud.common.service;
+
+import com.baomidou.mybatisplus.extension.service.IService;
+import com.qmth.teachcloud.common.entity.TMMqMessage;
+
+/**
+ * @Description: mq消息 服务类
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/9
+ */
+public interface TMMqMessageService extends IService<TMMqMessage> {
+
+}

+ 19 - 0
teachcloud-common/src/main/java/com/qmth/teachcloud/common/service/impl/TMMqMessageServiceImpl.java

@@ -0,0 +1,19 @@
+package com.qmth.teachcloud.common.service.impl;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.qmth.teachcloud.common.entity.TMMqMessage;
+import com.qmth.teachcloud.common.mapper.TMMqMessageMapper;
+import com.qmth.teachcloud.common.service.TMMqMessageService;
+import org.springframework.stereotype.Service;
+
+/**
+ * @Description: mq消息 服务实现类
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/9
+ */
+@Service
+public class TMMqMessageServiceImpl extends ServiceImpl<TMMqMessageMapper, TMMqMessage> implements TMMqMessageService {
+
+}

+ 5 - 0
teachcloud-common/src/main/resources/mapper/TMMqMessageMapper.xml

@@ -0,0 +1,5 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.qmth.teachcloud.common.mapper.TMMqMessageMapper">
+
+</mapper>