wangliang 4 жил өмнө
parent
commit
2f84c0471e

+ 5 - 1
distributed-print/src/main/java/com/qmth/distributed/print/api/TBTaskController.java

@@ -7,11 +7,13 @@ import com.qmth.boot.api.constant.ApiConstant;
 import com.qmth.distributed.print.business.bean.result.EditResult;
 import com.qmth.distributed.print.business.templete.execute.AsyncCreatePdfTempleteService;
 import com.qmth.distributed.print.interceptor.AuthInterceptor;
+import com.qmth.teachcloud.common.bean.dto.MqDto;
 import com.qmth.teachcloud.common.bean.params.ArraysParams;
 import com.qmth.teachcloud.common.bean.result.TaskListResult;
 import com.qmth.teachcloud.common.contant.SystemConstant;
 import com.qmth.teachcloud.common.entity.SysUser;
 import com.qmth.teachcloud.common.entity.TBTask;
+import com.qmth.teachcloud.common.enums.MqTagEnum;
 import com.qmth.teachcloud.common.enums.TaskResultEnum;
 import com.qmth.teachcloud.common.enums.TaskStatusEnum;
 import com.qmth.teachcloud.common.enums.TaskTypeEnum;
@@ -99,7 +101,9 @@ public class TBTaskController {
         map.computeIfAbsent(SystemConstant.USER, v -> sysUser);
         map.computeIfAbsent(SystemConstant.MANUAL, v -> true);
 //        asyncCreatePdfTempleteService.createPdf(map, null);
-        redisUtil.setForLeftList(SystemConstant.PDF_CACHE, map);
+        MqDto mqDto = new MqDto(MqTagEnum.PDF.getCode(), map, String.valueOf(tbTask.getId()));
+//        redisUtil.setForLeftList(SystemConstant.PDF_CACHE, mqDto);
+        redisUtil.sendMessage(mqDto.getTopic(), mqDto);
         return ResultUtil.ok(new EditResult());
     }
 

+ 46 - 0
distributed-print/src/main/java/com/qmth/distributed/print/config/RedisListenerConfig.java

@@ -0,0 +1,46 @@
+package com.qmth.distributed.print.config;
+
+import com.qmth.teachcloud.common.contant.SystemConstant;
+import com.qmth.teachcloud.common.enums.MqTagEnum;
+import com.qmth.teachcloud.common.threadPool.MyThreadPool;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.listener.ChannelTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.data.redis.listener.Topic;
+
+import javax.annotation.Resource;
+
+@Configuration
+public class RedisListenerConfig {
+
+    @Resource
+    private RedisTemplate redisTemplate;
+
+    @Resource
+    private RedisConnectionFactory redisConnectionFactory;
+
+    @Resource
+    private MessageListener messageListener;
+
+    @Resource
+    MyThreadPool myThreadPool;
+
+    @Bean
+    public RedisMessageListenerContainer initRedisContainer() {
+        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
+        //redis连接工厂
+        container.setConnectionFactory(redisConnectionFactory);
+        //设置运行任务池
+        container.setTaskExecutor(myThreadPool);
+        //定义监听渠道名称为
+        Topic topic = new ChannelTopic(MqTagEnum.PDF.getCode());
+        //定义监听器监听的Redis的消息
+        container.addMessageListener(messageListener, topic);
+        return container;
+    }
+}

+ 70 - 0
distributed-print/src/main/java/com/qmth/distributed/print/config/RedisMessageListener.java

@@ -0,0 +1,70 @@
+package com.qmth.distributed.print.config;
+
+import com.qmth.teachcloud.common.bean.dto.MqDto;
+import com.qmth.teachcloud.common.contant.SystemConstant;
+import com.qmth.teachcloud.common.util.JacksonUtil;
+import com.qmth.teachcloud.common.util.RedisUtil;
+import org.apache.commons.text.StringEscapeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Component
+public class RedisMessageListener implements MessageListener {
+    private final static Logger log = LoggerFactory.getLogger(RedisMessageListener.class);
+
+    @Resource
+    RedisUtil redisUtil;
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+        MqDto mqDto = null;
+        AtomicInteger integer = new AtomicInteger(0);
+        try {
+            //消息
+            String body = StringEscapeUtils.unescapeJson(new String(message.getBody(), SystemConstant.CHARSET_NAME));
+            if (Objects.nonNull(body)) {
+                mqDto = JacksonUtil.readJson(body.substring(1, body.length() - 1), MqDto.class);
+                for (; integer.get() < mqDto.getReconsume(); integer.incrementAndGet()) {
+                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
+                            && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(),
+                            SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
+                        //通道
+                        String channel = new String(message.getChannel(), SystemConstant.CHARSET_NAME);
+                        //渠道名称
+                        String topic = new String(bytes, SystemConstant.CHARSET_NAME);
+                        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                        log.info("mqDto:{},topic:{},channel:{}", JacksonUtil.parseJson(mqDto), JacksonUtil.parseJson(topic), JacksonUtil.parseJson(channel));
+                        break;
+                    } else {
+                        mqDto.setAck(SystemConstant.REDELIVERED_ACK_TYPE);
+                        Thread.sleep(Duration.ofSeconds(1L).toMillis() * (integer.get() == 0 ? 1 : integer.get()));
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("redis mq消息监听,消息消费出错", e);
+            e.printStackTrace();
+            if (Objects.nonNull(mqDto)) {
+                mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
+                integer.set(3);
+            }
+        } finally {
+            if (integer.get() == 3 && Objects.nonNull(mqDto)) {
+                mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
+                //存入库
+            }
+            if (Objects.nonNull(mqDto) && mqDto.getAck().intValue() == SystemConstant.STANDARD_ACK_TYPE
+                    || mqDto.getAck().intValue() == SystemConstant.POSION_ACK_TYPE) {
+                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
+            }
+        }
+    }
+}

+ 6 - 0
pom.xml

@@ -46,6 +46,7 @@
         <itextpdf.version>5.5.13</itextpdf.version>
         <googleBar.version>3.4.0</googleBar.version>
         <freemarker.version>2.3.30</freemarker.version>
+        <commons-text.version>1.9</commons-text.version>
     </properties>
 
     <dependencyManagement>
@@ -135,6 +136,11 @@
                 <artifactId>commons-codec</artifactId>
                 <version>${commons.codec.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-text</artifactId>
+                <version>${commons-text.version}</version>
+            </dependency>
             <dependency>
                 <groupId>com.google.code.gson</groupId>
                 <artifactId>gson</artifactId>

+ 4 - 0
teachcloud-common/pom.xml

@@ -58,6 +58,10 @@
             <groupId>commons-codec</groupId>
             <artifactId>commons-codec</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-text</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>

+ 168 - 0
teachcloud-common/src/main/java/com/qmth/teachcloud/common/bean/dto/MqDto.java

@@ -0,0 +1,168 @@
+package com.qmth.teachcloud.common.bean.dto;
+
+import com.qmth.teachcloud.common.contant.SystemConstant;
+import com.qmth.teachcloud.common.enums.MqTagEnum;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * @Description: mq dto
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/1
+ */
+public class MqDto implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private String id;//消息id
+    private String topic;//消息topic
+    private String tag;//消息tag
+    private Long timestamp;//时间戳
+    private Object body;//消息体
+    private MqTagEnum type;//消息类型
+    private String objId;//关联业务id
+    private String objName;//关联业务名称
+    private Integer ack;//ack
+    private Integer sequence;//序号
+    private Map<String, Object> properties;//扩展类型
+    private Integer reconsume = 3;//重试次数
+
+    public MqDto() {
+
+    }
+
+
+    public MqDto(String topic, String tag, String objId, Long timestamp) {
+        this.topic = topic;
+        this.tag = tag;
+        this.body = tag;
+        this.type = MqTagEnum.valueOf(tag);
+        this.objId = objId;
+        this.timestamp = timestamp;
+        this.id = SystemConstant.getUuid();
+    }
+
+    public MqDto(String topic, Object body, String objId) {
+        this.topic = topic;
+        this.tag = topic;
+        this.body = body;
+        this.type = MqTagEnum.valueOf(MqTagEnum.convertToName(tag));
+        this.objId = objId;
+        this.timestamp = System.currentTimeMillis();
+        this.id = SystemConstant.getUuid();
+    }
+
+    public MqDto(String topic, String tag, Object body, MqTagEnum type, String objId, Map properties) {
+        this.topic = topic;
+        this.tag = tag;
+        this.body = body;
+        this.type = type;
+        this.objId = objId;
+        this.timestamp = System.currentTimeMillis();
+        this.id = SystemConstant.getUuid();
+        this.properties = properties;
+    }
+
+    public Integer getReconsume() {
+        return reconsume;
+    }
+
+    public void setReconsume(Integer reconsume) {
+        this.reconsume = reconsume;
+    }
+
+    public static long getSerialVersionUID() {
+        return serialVersionUID;
+    }
+
+    public MqTagEnum getType() {
+        return type;
+    }
+
+    public void setType(MqTagEnum type) {
+        this.type = type;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getTag() {
+        return tag;
+    }
+
+    public void setTag(String tag) {
+        this.tag = tag;
+    }
+
+    public Long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(Long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public Object getBody() {
+        return body;
+    }
+
+    public void setBody(Object body) {
+        this.body = body;
+    }
+
+    public String getObjId() {
+        return objId;
+    }
+
+    public void setObjId(String objId) {
+        this.objId = objId;
+    }
+
+    public String getObjName() {
+        return objName;
+    }
+
+    public void setObjName(String objName) {
+        this.objName = objName;
+    }
+
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Map<String, Object> properties) {
+        this.properties = properties;
+    }
+
+    public Integer getAck() {
+        return ack;
+    }
+
+    public void setAck(Integer ack) {
+        this.ack = ack;
+    }
+
+    public Integer getSequence() {
+        return sequence;
+    }
+
+    public void setSequence(Integer sequence) {
+        this.sequence = sequence;
+    }
+}

+ 18 - 1
teachcloud-common/src/main/java/com/qmth/teachcloud/common/contant/SystemConstant.java

@@ -117,11 +117,17 @@ public class SystemConstant {
     public static final long REDIS_CREATE_PDF_EXPIRE_TIME = 1 * 60L * 60L;//过期时间1小时
     public static final long REDIS_WHU_USER_AUTH_EXPIRE_TIME = 2 * 60L;//过期时间2分钟
 
+    /**
+     * redis mq
+     */
+    public static final String REDIS_LOCK_MQ_PREFIX = "redis:lock:mq:";
+
     /**
      * redis lock
      */
     public static final int MAX_RETRY_COUNT = 30;
     public static final long REDIS_CACHE_TIME_OUT = 60L;
+    public static final long REDIS_LOCK_MQ_TIME_OUT = 60L;
 
     /**
      * aes相关
@@ -153,6 +159,17 @@ public class SystemConstant {
     public static final int THREAD_POOL_KEEP_ALIVE_SECONDS = 10;
     public static final int THREAD_POOL_QUEUE_CAPACITY = 500;
 
+    /**
+     * mq消息
+     */
+    public static final int DELIVERED_ACK_TYPE = 0;//消息"已发出",但尚未处理结束
+    public static final int POSION_ACK_TYPE = 1;//消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
+    public static final int STANDARD_ACK_TYPE = 2;//"标准"类型,通常表示为消息"处理成功",broker端可以删除消息了
+    public static final int REDELIVERED_ACK_TYPE = 3;//消息需"重发",比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息
+    public static final int INDIVIDUAL_ACK_TYPE = 4;//表示只确认"单条消息",无论在任何ACK_MODE下
+    public static final int UNMATCHED_ACK_TYPE = 5;//BROKER间转发消息时,接收端"拒绝"消息
+    public static final int UNSEND_ACK_TYPE = 6;//消息未发出
+
     /**
      * 初始化附件文件路径
      */
@@ -248,7 +265,7 @@ public class SystemConstant {
         }
     }
 
-    public static boolean strNotNull(String str){
+    public static boolean strNotNull(String str) {
         return str != null && str.length() > 0 && !str.equals("null");
     }
 }

+ 61 - 0
teachcloud-common/src/main/java/com/qmth/teachcloud/common/enums/MqTagEnum.java

@@ -0,0 +1,61 @@
+package com.qmth.teachcloud.common.enums;
+
+import java.util.Objects;
+
+/**
+ * @Description: mq topic enum
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/28
+ */
+public enum MqTagEnum {
+
+    PDF("创建pdf标签", "pdf", "normal", 0);
+
+    private MqTagEnum(String desc, String code, String type, int id) {
+        this.desc = desc;
+        this.code = code;
+        this.type = type;
+        this.id = id;
+    }
+
+    private String code;//留痕描述
+
+    private String desc;//标签
+
+    private String type;//消息类型,normal:正常消息,delay:延时消息,transactional:事务消息,broadcast:广播消息
+
+    private int id;
+
+    /**
+     * 状态转换 toName
+     *
+     * @param value
+     * @return
+     */
+    public static String convertToName(String value) {
+        for (MqTagEnum e : MqTagEnum.values()) {
+            if (Objects.equals(value.trim(), e.getCode())) {
+                return e.name();
+            }
+        }
+        return null;
+    }
+
+    public String getCode() {
+        return code;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public int getId() {
+        return id;
+    }
+}

+ 12 - 0
teachcloud-common/src/main/java/com/qmth/teachcloud/common/util/RedisUtil.java

@@ -1,5 +1,6 @@
 package com.qmth.teachcloud.common.util;
 
+import com.qmth.teachcloud.common.bean.dto.MqDto;
 import com.qmth.teachcloud.common.contant.SystemConstant;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
@@ -262,4 +263,15 @@ public class RedisUtil {
     public void expire(String key, long timeOut, TimeUnit timeUnit) {
         redisTemplate.expire(key, timeOut, timeUnit);
     }
+
+    /**
+     * 发送消息
+     *
+     * @param topic
+     * @param mqDto
+     */
+    public void sendMessage(String topic, MqDto mqDto) {
+        mqDto.setAck(SystemConstant.DELIVERED_ACK_TYPE);
+        redisTemplate.convertAndSend(topic, JacksonUtil.parseJson(mqDto));
+    }
 }