Browse Source

考务导入接口

wangliang 5 năm trước cách đây
mục cha
commit
984ac73c65

+ 0 - 235
themis-backend/src/main/java/com/qmth/themis/backend/listener/RocketSessionConsumer.java

@@ -1,235 +0,0 @@
-package com.qmth.themis.backend.listener;
-
-import com.google.gson.Gson;
-import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.entity.TBSession;
-import com.qmth.themis.business.entity.TMRocketMessage;
-import com.qmth.themis.business.service.TBSessionService;
-import com.qmth.themis.business.service.TMRocketMessageService;
-import com.qmth.themis.business.util.JacksonUtil;
-import com.qmth.themis.business.util.RedisUtil;
-import com.qmth.themis.common.contanst.Constants;
-import com.qmth.themis.mq.dto.MqDto;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
-import org.apache.rocketmq.spring.annotation.SelectorType;
-import org.apache.rocketmq.spring.core.RocketMQListener;
-import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-
-import javax.annotation.Resource;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @Description: 普通消息监听 session_topic
- * @Param:
- * @return:
- * @Author: wangliang
- * @Date: 2020/6/28
- */
-@Service
-public class RocketSessionConsumer implements
-//        MessageListenerOrderly
-        MessageListenerConcurrently //并发消费
-{
-
-    private final static Logger log = LoggerFactory.getLogger(RocketSessionConsumer.class);
-
-    @Resource
-    TBSessionService tbSessionService;
-
-    @Resource
-    RedisUtil redisUtil;
-
-    @Resource
-    TMRocketMessageService tmRocketMessageService;
-
-    /**
-     * 并发消费
-     *
-     * @param msgs
-     * @param consumeConcurrentlyContext
-     * @return
-     */
-    @Override
-    @Transactional
-    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
-        MqDto mqDto = null;
-        try {
-            long threadId = Thread.currentThread().getId();
-            String threadName = Thread.currentThread().getName();
-            Gson gson = new Gson();
-            for (MessageExt messageExt : msgs) {
-                log.info(":{}-:{} sessionConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
-                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
-                log.info(":{}-:{} sessionConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
-                log.info(":{}-:{} sessionConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
-                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
-                    log.info(":{}-:{} 更新db", threadId, threadName);
-                    tbSessionService.saveSessionInfo(JacksonUtil.readJson(JacksonUtil.parseJson(mqDto.getBody()), TBSession.class), mqDto.getTimestamp());
-                    mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
-                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                    tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
-                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                    redisUtil.deleteMqTopicList(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId());
-                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                } else {
-                    log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-                }
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-        } finally {
-            if (Objects.nonNull(mqDto)) {
-                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
-            }
-        }
-        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
-    }
-
-//    /**
-//     * 顺序消费
-//     *
-//     * @param msgs
-//     * @param consumeOrderlyContext
-//     * @return
-//     */
-//    @Override
-//    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext
-//            consumeOrderlyContext) {
-//        try {
-//            for (MessageExt messageExt : msgs) {
-//                log.info("sessionConsumer重试次数:{}", messageExt.getReconsumeTimes());
-//                MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
-//                log.info("sessionConsumer接受到的消息:{}", JacksonUtil.parseJson(mqDto));
-//                log.info("mqDto sequence:{},tag:{}", mqDto.getSequence(), mqDto.getTag());
-//                MqDto redisMqdto = (MqDto) redisUtil.getSessionTopicList(mqDto.getId());
-//                if (Objects.nonNull(redisMqdto)) {
-//                    if (Objects.nonNull(redisMqdto.getAck()) && redisMqdto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
-//                        log.info("更新db");
-//                        tbSessionService.saveSessionInfo(toJavaObject((JSON) mqDto.getBody(), TBSession.class), redisMqdto.getTimestamp());
-//                        redisUtil.deleteSessionTopicList(redisMqdto.getId());
-//                        return ConsumeOrderlyStatus.SUCCESS;
-//                    } else {
-//                        log.info("消息ack未确认,重发");
-//                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
-//                    }
-//                } else {
-//                    log.info("消息数据为空,重发消息");
-//                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
-//                }
-//            }
-//        } catch (Exception e) {
-//            e.printStackTrace();
-//            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
-//        }
-//        return ConsumeOrderlyStatus.SUCCESS;//成功
-//    }
-
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWebGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWebTag}")
-    public class sessionConsumerWeb implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-//            defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
-            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
-        }
-    }
-
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerPcGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicPcTag}")
-    public class sessionConsumerPc implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
-        }
-    }
-
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWxappMonitorGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWxappMonitorTag}")
-    public class sessionConsumerWxappMonitor implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
-        }
-    }
-
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWxappAnswerGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWxappAnswerTag}")
-    public class sessionConsumerWxappAnswer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
-        }
-    }
-
-    /**
-     * 死信队列
-     */
-//    @Service
-//    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerGroupDlq}", topic = "${mq.config.sessionTopicDlq}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicTag}")
-//    public class dlqSessionConsumer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-//
-//        @Override
-//        public void onMessage(Message message) {
-//            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-//        }
-//
-//        @Override
-//        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-//            log.info("dlqSessionConsumer死信队列进来了");
-//            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-//            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-//            defaultMQPushConsumer.registerMessageListener(RocketConsumer.this::consumeMessage);
-//        }
-//    }
-}

+ 0 - 128
themis-backend/src/main/java/com/qmth/themis/backend/listener/RocketUserLogConsumer.java

@@ -1,128 +0,0 @@
-package com.qmth.themis.backend.listener;
-
-import com.google.gson.Gson;
-import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.entity.TMRocketMessage;
-import com.qmth.themis.business.enums.MqEnum;
-import com.qmth.themis.business.enums.SystemOperationEnum;
-import com.qmth.themis.business.service.TEUserLogService;
-import com.qmth.themis.business.service.TMRocketMessageService;
-import com.qmth.themis.business.util.JacksonUtil;
-import com.qmth.themis.business.util.RedisUtil;
-import com.qmth.themis.common.contanst.Constants;
-import com.qmth.themis.mq.dto.MqDto;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
-import org.apache.rocketmq.spring.annotation.SelectorType;
-import org.apache.rocketmq.spring.core.RocketMQListener;
-import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-
-import javax.annotation.Resource;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @Description: 普通消息监听 用户日志
- * @Param:
- * @return:
- * @Author: wangliang
- * @Date: 2020/7/2
- */
-@Service
-public class RocketUserLogConsumer implements MessageListenerConcurrently {
-    private final static Logger log = LoggerFactory.getLogger(RocketUserLogConsumer.class);
-
-    @Resource
-    RedisUtil redisUtil;
-
-    @Resource
-    TEUserLogService teUserLogService;
-
-    @Resource
-    TMRocketMessageService tmRocketMessageService;
-
-    @Override
-    @Transactional
-    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
-        MqDto mqDto = null;
-        try {
-            long threadId = Thread.currentThread().getId();
-            String threadName = Thread.currentThread().getName();
-            Gson gson = new Gson();
-            for (MessageExt messageExt : msgs) {
-                log.info(":{}-:{} userLogConsumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
-//                MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
-                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
-                log.info(":{}-:{} userLogConsumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
-                log.info(":{}-:{} userLogConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
-                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
-                    log.info(":{}-:{} 插入用户轨迹日志", threadId, threadName);
-                    teUserLogService.saveUserLogInfo(mqDto.getTimestamp(), mqDto.getObjId(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
-                    mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
-                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                    redisUtil.deleteMqTopicList(SystemConstant.USERLOG_TOPIC_BUFFER_LIST, mqDto.getId());
-                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                } else {
-                    log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-                }
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-        } finally {
-            if (Objects.nonNull(mqDto)) {
-                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
-            }
-        }
-        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
-    }
-
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.userLogConsumerUserGroup}", topic = "${mq.config.userLogTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.userLogTopicUserTag}")
-    public class sessionConsumerUserLog implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-            defaultMQPushConsumer.registerMessageListener(RocketUserLogConsumer.this::consumeMessage);
-        }
-    }
-
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.userLogConsumerStudentGroup}", topic = "${mq.config.userLogTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.userLogTopicStudentTag}")
-    public class sessionConsumerStudentLog implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-            defaultMQPushConsumer.registerMessageListener(RocketUserLogConsumer.this::consumeMessage);
-        }
-    }
-}

+ 31 - 0
themis-business/src/main/java/com/qmth/themis/business/constant/SystemConstant.java

@@ -6,6 +6,7 @@ import com.qmth.themis.common.enums.Source;
 
 import java.util.Calendar;
 import java.util.Date;
+import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -132,4 +133,34 @@ public class SystemConstant {
         long diff = (System.currentTimeMillis() - timestamp) / 1000;
         return diff < -1 * Constants.SIGNATURE_AHEAD_SECONDS || diff > Constants.SIGNATURE_EXPIRE_SECONDS;
     }
+
+    /**
+     * 毫秒时间转换
+     *
+     * @param map
+     * @return
+     */
+    public static Map timeTransform(Map map) {
+        if (Objects.nonNull(map.get("createTime")) && map.get("createTime") instanceof Long) {
+            Date date = new Date();
+            date.setTime(Long.parseLong(String.valueOf(map.get("createTime"))));
+            map.put("createTime", date);
+        }
+        if (Objects.nonNull(map.get("startTime")) && map.get("startTime") instanceof Long) {
+            Date date = new Date();
+            date.setTime(Long.parseLong(String.valueOf(map.get("startTime"))));
+            map.put("startTime", date);
+        }
+        if (Objects.nonNull(map.get("finishTime")) && map.get("finishTime") instanceof Long) {
+            Date date = new Date();
+            date.setTime(Long.parseLong(String.valueOf(map.get("finishTime"))));
+            map.put("finishTime", date);
+        }
+        if (Objects.nonNull(map.get("updateTime")) && map.get("updateTime") instanceof Long) {
+            Date date = new Date();
+            date.setTime(Long.parseLong(String.valueOf(map.get("updateTime"))));
+            map.put("updateTime", date);
+        }
+        return map;
+    }
 }

+ 3 - 32
themis-business/src/main/java/com/qmth/themis/business/forkjoin/ForkJoinTask.java

@@ -3,6 +3,7 @@ package com.qmth.themis.business.forkjoin;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.google.gson.Gson;
 import com.qmth.themis.business.constant.SpringContextHolder;
+import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.dto.ExamStudentDtoImport;
 import com.qmth.themis.business.entity.*;
 import com.qmth.themis.business.enums.RoleEnum;
@@ -45,7 +46,7 @@ public class ForkJoinTask extends RecursiveTask<List<TEExamStudent>> {
         Gson gson = new Gson();
         List<TEExamStudent> teExamStudentList = new ArrayList<>();
         Map tbTaskHistoryMap = (Map) map.get("tbTaskHistory");
-        tbTaskHistoryMap = timeTransform(tbTaskHistoryMap);
+        tbTaskHistoryMap = SystemConstant.timeTransform(tbTaskHistoryMap);
         TBTaskHistory tbTaskHistory = gson.fromJson(gson.toJson(tbTaskHistoryMap), TBTaskHistory.class);
         Long examId = Long.parseLong(String.valueOf(map.get("examId")));
         Long orgId = Long.parseLong(String.valueOf(map.get("orgId")));
@@ -59,7 +60,7 @@ public class ForkJoinTask extends RecursiveTask<List<TEExamStudent>> {
             for (int i = start; i <= end; i++) {
                 ExamStudentDtoImport examStudentDtoImport = (ExamStudentDtoImport) examStudentDtoImportList.get(i);
                 Map m = (Map) teExamActivityMap.get(examStudentDtoImport.getExamActivityCode());
-                m = timeTransform(m);
+                m = SystemConstant.timeTransform(m);
                 TEExamActivity teExamActivity = gson.fromJson(gson.toJson(m), TEExamActivity.class);
                 if (Objects.isNull(teExamActivity)) {
                     throw new BusinessException("没有" + examStudentDtoImport.getExamActivityCode() + "的考场");
@@ -122,34 +123,4 @@ public class ForkJoinTask extends RecursiveTask<List<TEExamStudent>> {
         }
         return teExamStudentList;
     }
-
-    /**
-     * 毫秒时间转换
-     *
-     * @param map
-     * @return
-     */
-    Map timeTransform(Map map) {
-        if (Objects.nonNull(map.get("createTime")) && map.get("createTime") instanceof Long) {
-            Date date = new Date();
-            date.setTime(Long.parseLong(String.valueOf(map.get("createTime"))));
-            map.put("createTime", date);
-        }
-        if (Objects.nonNull(map.get("startTime")) && map.get("startTime") instanceof Long) {
-            Date date = new Date();
-            date.setTime(Long.parseLong(String.valueOf(map.get("startTime"))));
-            map.put("startTime", date);
-        }
-        if (Objects.nonNull(map.get("finishTime")) && map.get("finishTime") instanceof Long) {
-            Date date = new Date();
-            date.setTime(Long.parseLong(String.valueOf(map.get("finishTime"))));
-            map.put("finishTime", date);
-        }
-        if (Objects.nonNull(map.get("updateTime")) && map.get("updateTime") instanceof Long) {
-            Date date = new Date();
-            date.setTime(Long.parseLong(String.valueOf(map.get("updateTime"))));
-            map.put("updateTime", date);
-        }
-        return map;
-    }
 }

+ 207 - 201
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketSessionConsumer.java

@@ -1,160 +1,223 @@
-//package com.qmth.themis.mq.listener;
-//
-//import com.google.gson.Gson;
-//import com.qmth.themis.business.constant.SystemConstant;
-//import com.qmth.themis.business.entity.TBSession;
-//import com.qmth.themis.business.entity.TMRocketMessage;
-//import com.qmth.themis.business.service.TBSessionService;
-//import com.qmth.themis.business.service.TMRocketMessageService;
-//import com.qmth.themis.business.util.JacksonUtil;
-//import com.qmth.themis.business.util.RedisUtil;
-//import com.qmth.themis.common.contanst.Constants;
-//import com.qmth.themis.mq.dto.MqDto;
-//import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-//import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-//import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-//import org.apache.rocketmq.common.message.Message;
-//import org.apache.rocketmq.common.message.MessageExt;
-//import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
-//import org.apache.rocketmq.spring.annotation.SelectorType;
-//import org.apache.rocketmq.spring.core.RocketMQListener;
-//import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//import org.springframework.stereotype.Service;
-//import org.springframework.transaction.annotation.Transactional;
-//
-//import javax.annotation.Resource;
-//import java.util.List;
-//import java.util.Objects;
-//
-///**
-// * @Description: 普通消息监听 session_topic
-// * @Param:
-// * @return:
-// * @Author: wangliang
-// * @Date: 2020/6/28
-// */
-//@Service
-//public class RocketSessionConsumer implements
-////        MessageListenerOrderly
-//        MessageListenerConcurrently //并发消费
-//{
-//
-//    private final static Logger log = LoggerFactory.getLogger(RocketSessionConsumer.class);
-//
-//    @Resource
-//    TBSessionService tbSessionService;
-//
-//    @Resource
-//    RedisUtil redisUtil;
-//
-//    @Resource
-//    TMRocketMessageService tmRocketMessageService;
-//
+package com.qmth.themis.mq.listener;
+
+import com.google.gson.Gson;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.entity.TBSession;
+import com.qmth.themis.business.entity.TMRocketMessage;
+import com.qmth.themis.business.service.TBSessionService;
+import com.qmth.themis.business.service.TMRocketMessageService;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.mq.dto.MqDto;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.annotation.SelectorType;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @Description: 普通消息监听 session_topic
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/6/28
+ */
+@Service
+public class RocketSessionConsumer implements
+//        MessageListenerOrderly
+        MessageListenerConcurrently //并发消费
+{
+
+    private final static Logger log = LoggerFactory.getLogger(RocketSessionConsumer.class);
+
+    @Resource
+    TBSessionService tbSessionService;
+
+    @Resource
+    RedisUtil redisUtil;
+
+    @Resource
+    TMRocketMessageService tmRocketMessageService;
+
+    /**
+     * 并发消费
+     *
+     * @param msgs
+     * @param consumeConcurrentlyContext
+     * @return
+     */
+    @Override
+    @Transactional
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        MqDto mqDto = null;
+        try {
+            long threadId = Thread.currentThread().getId();
+            String threadName = Thread.currentThread().getName();
+            Gson gson = new Gson();
+            for (MessageExt messageExt : msgs) {
+                log.info(":{}-:{} sessionConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
+                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
+                log.info(":{}-:{} sessionConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
+                log.info(":{}-:{} sessionConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
+                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.getMqTopicList(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
+                    log.info(":{}-:{} 更新db", threadId, threadName);
+                    tbSessionService.saveSessionInfo(JacksonUtil.readJson(JacksonUtil.parseJson(mqDto.getBody()), TBSession.class), mqDto.getTimestamp());
+                    mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+                    tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
+                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+                    redisUtil.deleteMqTopicList(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId());
+                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                } else {
+                    log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
+                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+        } finally {
+            if (Objects.nonNull(mqDto)) {
+                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
+            }
+        }
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+    }
+
 //    /**
-//     * 并发消费
+//     * 顺序消费
 //     *
 //     * @param msgs
-//     * @param consumeConcurrentlyContext
+//     * @param consumeOrderlyContext
 //     * @return
 //     */
 //    @Override
-//    @Transactional
-//    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+//    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext
+//            consumeOrderlyContext) {
 //        try {
-//            long threadId = Thread.currentThread().getId();
-//            String threadName = Thread.currentThread().getName();
-//            Gson gson = new Gson();
 //            for (MessageExt messageExt : msgs) {
-//                log.info(":{}-:{} sessionConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
-//                MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
-//                log.info(":{}-:{} sessionConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
-//                log.info(":{}-:{} sessionConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
-//                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
-//                    log.info(":{}-:{} 更新db", threadId, threadName);
-//                    tbSessionService.saveSessionInfo(JacksonUtil.readJson(JacksonUtil.parseJson(mqDto.getBody()), TBSession.class), mqDto.getTimestamp());
-//                    mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
-//                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-//                    tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
-//                    tmRocketMessageService.save(tmRocketMessage);
-//                    redisUtil.deleteSessionTopicList(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId());
-//                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+//                log.info("sessionConsumer重试次数:{}", messageExt.getReconsumeTimes());
+//                MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
+//                log.info("sessionConsumer接受到的消息:{}", JacksonUtil.parseJson(mqDto));
+//                log.info("mqDto sequence:{},tag:{}", mqDto.getSequence(), mqDto.getTag());
+//                MqDto redisMqdto = (MqDto) redisUtil.getSessionTopicList(mqDto.getId());
+//                if (Objects.nonNull(redisMqdto)) {
+//                    if (Objects.nonNull(redisMqdto.getAck()) && redisMqdto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
+//                        log.info("更新db");
+//                        tbSessionService.saveSessionInfo(toJavaObject((JSON) mqDto.getBody(), TBSession.class), redisMqdto.getTimestamp());
+//                        redisUtil.deleteSessionTopicList(redisMqdto.getId());
+//                        return ConsumeOrderlyStatus.SUCCESS;
+//                    } else {
+//                        log.info("消息ack未确认,重发");
+//                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
+//                    }
 //                } else {
-//                    log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-//                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+//                    log.info("消息数据为空,重发消息");
+//                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
 //                }
 //            }
 //        } catch (Exception e) {
 //            e.printStackTrace();
-//            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-//        }
-//        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
-//    }
-//
-////    /**
-////     * 顺序消费
-////     *
-////     * @param msgs
-////     * @param consumeOrderlyContext
-////     * @return
-////     */
-////    @Override
-////    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext
-////            consumeOrderlyContext) {
-////        try {
-////            for (MessageExt messageExt : msgs) {
-////                log.info("sessionConsumer重试次数:{}", messageExt.getReconsumeTimes());
-////                MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
-////                log.info("sessionConsumer接受到的消息:{}", JacksonUtil.parseJson(mqDto));
-////                log.info("mqDto sequence:{},tag:{}", mqDto.getSequence(), mqDto.getTag());
-////                MqDto redisMqdto = (MqDto) redisUtil.getSessionTopicList(mqDto.getId());
-////                if (Objects.nonNull(redisMqdto)) {
-////                    if (Objects.nonNull(redisMqdto.getAck()) && redisMqdto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
-////                        log.info("更新db");
-////                        tbSessionService.saveSessionInfo(toJavaObject((JSON) mqDto.getBody(), TBSession.class), redisMqdto.getTimestamp());
-////                        redisUtil.deleteSessionTopicList(redisMqdto.getId());
-////                        return ConsumeOrderlyStatus.SUCCESS;
-////                    } else {
-////                        log.info("消息ack未确认,重发");
-////                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
-////                    }
-////                } else {
-////                    log.info("消息数据为空,重发消息");
-////                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
-////                }
-////            }
-////        } catch (Exception e) {
-////            e.printStackTrace();
-////            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
-////        }
-////        return ConsumeOrderlyStatus.SUCCESS;//成功
-////    }
-//
-//    @Service
-//    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWebGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWebTag}")
-//    public class sessionConsumerWeb implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-//
-//        @Override
-//        public void onMessage(Message message) {
-//            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-//        }
-//
-//        @Override
-//        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-//            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-//            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-//            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-////            defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
-//            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
+//            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
 //        }
+//        return ConsumeOrderlyStatus.SUCCESS;//成功
 //    }
-//
+
+    @Service
+    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWebGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWebTag}")
+    public class sessionConsumerWeb implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
+
+        @Override
+        public void onMessage(Message message) {
+            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
+        }
+
+        @Override
+        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
+            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
+            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
+//            defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
+            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
+        }
+    }
+
+    @Service
+    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerPcGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicPcTag}")
+    public class sessionConsumerPc implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
+
+        @Override
+        public void onMessage(Message message) {
+            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
+        }
+
+        @Override
+        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
+            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
+            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
+            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
+        }
+    }
+
+    @Service
+    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWxappMonitorGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWxappMonitorTag}")
+    public class sessionConsumerWxappMonitor implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
+
+        @Override
+        public void onMessage(Message message) {
+            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
+        }
+
+        @Override
+        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
+            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
+            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
+            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
+        }
+    }
+
+    @Service
+    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWxappAnswerGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWxappAnswerTag}")
+    public class sessionConsumerWxappAnswer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
+
+        @Override
+        public void onMessage(Message message) {
+            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
+        }
+
+        @Override
+        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
+            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
+            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
+            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
+        }
+    }
+
+    /**
+     * 死信队列
+     */
 //    @Service
-//    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerPcGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicPcTag}")
-//    public class sessionConsumerPc implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
+//    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerGroupDlq}", topic = "${mq.config.sessionTopicDlq}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicTag}")
+//    public class dlqSessionConsumer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
 //
 //        @Override
 //        public void onMessage(Message message) {
@@ -163,67 +226,10 @@
 //
 //        @Override
 //        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
+//            log.info("dlqSessionConsumer死信队列进来了");
 //            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-//            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 //            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-//            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
+//            defaultMQPushConsumer.registerMessageListener(RocketConsumer.this::consumeMessage);
 //        }
 //    }
-//
-//    @Service
-//    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWxappMonitorGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWxappMonitorTag}")
-//    public class sessionConsumerWxappMonitor implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-//
-//        @Override
-//        public void onMessage(Message message) {
-//            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-//        }
-//
-//        @Override
-//        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-//            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-//            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-//            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-//            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
-//        }
-//    }
-//
-//    @Service
-//    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWxappAnswerGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWxappAnswerTag}")
-//    public class sessionConsumerWxappAnswer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-//
-//        @Override
-//        public void onMessage(Message message) {
-//            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-//        }
-//
-//        @Override
-//        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-//            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-//            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-//            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-//            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
-//        }
-//    }
-//
-//    /**
-//     * 死信队列
-//     */
-////    @Service
-////    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerGroupDlq}", topic = "${mq.config.sessionTopicDlq}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicTag}")
-////    public class dlqSessionConsumer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-////
-////        @Override
-////        public void onMessage(Message message) {
-////            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-////        }
-////
-////        @Override
-////        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-////            log.info("dlqSessionConsumer死信队列进来了");
-////            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-////            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-////            defaultMQPushConsumer.registerMessageListener(RocketConsumer.this::consumeMessage);
-////        }
-////    }
-//}
+}

+ 128 - 122
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketUserLogConsumer.java

@@ -1,122 +1,128 @@
-//package com.qmth.themis.mq.listener;
-//
-//import com.google.gson.Gson;
-//import com.qmth.themis.business.constant.SystemConstant;
-//import com.qmth.themis.business.entity.TMRocketMessage;
-//import com.qmth.themis.business.enums.MqEnum;
-//import com.qmth.themis.business.enums.SystemOperationEnum;
-//import com.qmth.themis.business.service.TEUserLogService;
-//import com.qmth.themis.business.service.TMRocketMessageService;
-//import com.qmth.themis.business.util.JacksonUtil;
-//import com.qmth.themis.business.util.RedisUtil;
-//import com.qmth.themis.common.contanst.Constants;
-//import com.qmth.themis.mq.dto.MqDto;
-//import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-//import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-//import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-//import org.apache.rocketmq.common.message.Message;
-//import org.apache.rocketmq.common.message.MessageExt;
-//import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
-//import org.apache.rocketmq.spring.annotation.SelectorType;
-//import org.apache.rocketmq.spring.core.RocketMQListener;
-//import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//import org.springframework.stereotype.Service;
-//import org.springframework.transaction.annotation.Transactional;
-//
-//import javax.annotation.Resource;
-//import java.util.List;
-//import java.util.Objects;
-//
-///**
-// * @Description: 普通消息监听 用户日志
-// * @Param:
-// * @return:
-// * @Author: wangliang
-// * @Date: 2020/7/2
-// */
-//@Service
-//public class RocketUserLogConsumer implements MessageListenerConcurrently {
-//    private final static Logger log = LoggerFactory.getLogger(RocketUserLogConsumer.class);
-//
-//    @Resource
-//    RedisUtil redisUtil;
-//
-//    @Resource
-//    TEUserLogService teUserLogService;
-//
-//    @Resource
-//    TMRocketMessageService tmRocketMessageService;
-//
-//    @Override
-//    @Transactional
-//    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
-//        try {
-//            long threadId = Thread.currentThread().getId();
-//            String threadName = Thread.currentThread().getName();
-//            Gson gson = new Gson();
-//            for (MessageExt messageExt : msgs) {
-//                log.info(":{}-:{} userLogConsumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
-////                MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
-//                MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
-//                log.info(":{}-:{} userLogConsumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
-//                log.info(":{}-:{} userLogConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
-//                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
-//                    log.info(":{}-:{} 插入用户轨迹日志", threadId, threadName);
-//                    teUserLogService.saveUserLogInfo(mqDto.getTimestamp(), mqDto.getObjId(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
-//                    mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
-//                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-//                    tmRocketMessageService.save(tmRocketMessage);
-//                    redisUtil.deleteSessionTopicList(SystemConstant.USERLOG_TOPIC_BUFFER_LIST, mqDto.getId());
-//                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-//                } else {
-//                    log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-//                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-//                }
-//            }
-//        } catch (Exception e) {
-//            e.printStackTrace();
-//            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-//        }
-//        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
-//    }
-//
-//    @Service
-//    @RocketMQMessageListener(consumerGroup = "${mq.config.userLogConsumerUserGroup}", topic = "${mq.config.userLogTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.userLogTopicUserTag}")
-//    public class sessionConsumerUserLog implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-//
-//        @Override
-//        public void onMessage(Message message) {
-//            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-//        }
-//
-//        @Override
-//        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-//            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-//            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-//            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-//            defaultMQPushConsumer.registerMessageListener(RocketUserLogConsumer.this::consumeMessage);
-//        }
-//    }
-//
-//    @Service
-//    @RocketMQMessageListener(consumerGroup = "${mq.config.userLogConsumerStudentGroup}", topic = "${mq.config.userLogTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.userLogTopicStudentTag}")
-//    public class sessionConsumerStudentLog implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-//
-//        @Override
-//        public void onMessage(Message message) {
-//            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-//        }
-//
-//        @Override
-//        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-//            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-//            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-//            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-//            defaultMQPushConsumer.registerMessageListener(RocketUserLogConsumer.this::consumeMessage);
-//        }
-//    }
-//}
+package com.qmth.themis.mq.listener;
+
+import com.google.gson.Gson;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.entity.TMRocketMessage;
+import com.qmth.themis.business.enums.MqEnum;
+import com.qmth.themis.business.enums.SystemOperationEnum;
+import com.qmth.themis.business.service.TEUserLogService;
+import com.qmth.themis.business.service.TMRocketMessageService;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.mq.dto.MqDto;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.annotation.SelectorType;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @Description: 普通消息监听 用户日志
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/2
+ */
+@Service
+public class RocketUserLogConsumer implements MessageListenerConcurrently {
+    private final static Logger log = LoggerFactory.getLogger(RocketUserLogConsumer.class);
+
+    @Resource
+    RedisUtil redisUtil;
+
+    @Resource
+    TEUserLogService teUserLogService;
+
+    @Resource
+    TMRocketMessageService tmRocketMessageService;
+
+    @Override
+    @Transactional
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        MqDto mqDto = null;
+        try {
+            long threadId = Thread.currentThread().getId();
+            String threadName = Thread.currentThread().getName();
+            Gson gson = new Gson();
+            for (MessageExt messageExt : msgs) {
+                log.info(":{}-:{} userLogConsumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
+//                MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
+                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
+                log.info(":{}-:{} userLogConsumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
+                log.info(":{}-:{} userLogConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
+                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.getMqTopicList(SystemConstant.USERLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
+                    log.info(":{}-:{} 插入用户轨迹日志", threadId, threadName);
+                    teUserLogService.saveUserLogInfo(mqDto.getTimestamp(), mqDto.getObjId(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
+                    mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+                    redisUtil.deleteMqTopicList(SystemConstant.USERLOG_TOPIC_BUFFER_LIST, mqDto.getId());
+                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                } else {
+                    log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
+                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+        } finally {
+            if (Objects.nonNull(mqDto)) {
+                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
+            }
+        }
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+    }
+
+    @Service
+    @RocketMQMessageListener(consumerGroup = "${mq.config.userLogConsumerUserGroup}", topic = "${mq.config.userLogTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.userLogTopicUserTag}")
+    public class sessionConsumerUserLog implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
+
+        @Override
+        public void onMessage(Message message) {
+            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
+        }
+
+        @Override
+        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
+            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
+            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
+            defaultMQPushConsumer.registerMessageListener(RocketUserLogConsumer.this::consumeMessage);
+        }
+    }
+
+    @Service
+    @RocketMQMessageListener(consumerGroup = "${mq.config.userLogConsumerStudentGroup}", topic = "${mq.config.userLogTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.userLogTopicStudentTag}")
+    public class sessionConsumerStudentLog implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
+
+        @Override
+        public void onMessage(Message message) {
+            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
+        }
+
+        @Override
+        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
+            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
+            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
+            defaultMQPushConsumer.registerMessageListener(RocketUserLogConsumer.this::consumeMessage);
+        }
+    }
+}

+ 5 - 3
themis-task/src/main/java/com/qmth/themis/task/listener/RocketTaskConsumer.java

@@ -4,8 +4,6 @@ import com.alibaba.fastjson.JSONObject;
 import com.google.gson.Gson;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.entity.TMRocketMessage;
-import com.qmth.themis.business.enums.MqEnum;
-import com.qmth.themis.business.service.TEUserLogService;
 import com.qmth.themis.business.service.TMRocketMessageService;
 import com.qmth.themis.business.templete.TaskImportTemplete;
 import com.qmth.themis.business.templete.impl.ExamStudentTemplete;
@@ -73,7 +71,7 @@ public class RocketTaskConsumer implements MessageListenerConcurrently {
                 mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                 log.info(":{}-:{} taskConsumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
                 log.info(":{}-:{} taskConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
-                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
+                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.getMqTopicList(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
                     Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
                     JSONObject jsonObject = JSONObject.parseObject(String.valueOf(map.get("remark")));
                     String type = String.valueOf(jsonObject.get("type"));
@@ -103,6 +101,10 @@ public class RocketTaskConsumer implements MessageListenerConcurrently {
         } catch (Exception e) {
             e.printStackTrace();
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+        } finally {
+            if (Objects.nonNull(mqDto)) {
+                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId());
+            }
         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
     }