wangliang 5 ani în urmă
părinte
comite
bdb10125df

+ 5 - 25
themis-backend/src/main/java/com/qmth/themis/backend/api/TBUserController.java

@@ -3,17 +3,17 @@ package com.qmth.themis.backend.api;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.qmth.themis.backend.config.DictionaryConfig;
-import com.qmth.themis.backend.quartz.MqJob;
 import com.qmth.themis.backend.util.ServletUtil;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.dto.AuthDto;
-import com.qmth.themis.business.dto.MqDto;
 import com.qmth.themis.business.entity.TBSession;
 import com.qmth.themis.business.entity.TBUser;
 import com.qmth.themis.business.enums.MqEnum;
 import com.qmth.themis.business.enums.RoleEnum;
 import com.qmth.themis.business.enums.SystemOperationEnum;
-import com.qmth.themis.business.service.*;
+import com.qmth.themis.business.service.EhcacheService;
+import com.qmth.themis.business.service.MqDtoService;
+import com.qmth.themis.business.service.TBUserService;
 import com.qmth.themis.business.util.EhcacheUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.business.util.SessionUtil;
@@ -65,9 +65,6 @@ public class TBUserController {
     @Resource
     EhcacheService ehcacheService;
 
-    @Resource
-    QuartzService quartzService;
-
     @Resource
     DictionaryConfig dictionaryConfig;
 
@@ -122,8 +119,8 @@ public class TBUserController {
         TBSession tbSession = new TBSession(sessionId, String.valueOf(user.getId()), authDto.getRoleEnum().name(), platform.getSource(), platform.name(), deviceId, request.getLocalAddr(), token, expire);
         redisUtil.setUserSession(sessionId, tbSession);
         //mq发送消息start
-        mqDtoService.assembleSendOneWayMsg(dictionaryConfig.mqConfigDomain().getSessionTopic(), platform.getSource(), tbSession, MqEnum.SESSION.name(), tbSession.getId());
-        mqDtoService.assembleSendOneWayMsg(dictionaryConfig.mqConfigDomain().getUserLogTopic(), Objects.equals(authDto.getRoleEnum().name(), RoleEnum.STUDENT.name()) ? dictionaryConfig.mqConfigDomain().getUserLogTopicStudentTag() : dictionaryConfig.mqConfigDomain().getUserLogTopicUserTag(), SystemOperationEnum.LOGIN, MqEnum.USER_LOG.name(), user.getId());
+        mqDtoService.assembleSendOneWayMsg(dictionaryConfig.mqConfigDomain().getSessionTopic(), platform.getSource(), tbSession, MqEnum.SESSION.name(), tbSession.getId(), user.getLoginName());
+        mqDtoService.assembleSendOneWayMsg(dictionaryConfig.mqConfigDomain().getUserLogTopic(), Objects.equals(authDto.getRoleEnum().name(), RoleEnum.STUDENT.name()) ? dictionaryConfig.mqConfigDomain().getUserLogTopicStudentTag() : dictionaryConfig.mqConfigDomain().getUserLogTopicUserTag(), SystemOperationEnum.LOGIN, MqEnum.USER_LOG.name(), user.getId(), user.getLoginName());
         //mq发送消息end
         //测试
         String test = SignatureInfo.build(SignatureType.TOKEN, sessionId, token);
@@ -138,23 +135,6 @@ public class TBUserController {
     @ApiOperation(value = "用户查询接口")
     @RequestMapping(value = "/list", method = RequestMethod.GET)
     public Result list() {
-        Map<String, Object> map = new HashMap<>();
-        map.put("name", MqJob.class.getName());
-        quartzService.deleteJob(dictionaryConfig.quartzConfigDomain().getJobName(), dictionaryConfig.quartzConfigDomain().getJobGroupName());
-        quartzService.addJob(MqJob.class, dictionaryConfig.quartzConfigDomain().getJobName(), dictionaryConfig.quartzConfigDomain().getJobGroupName(), "0 0/1 * * * ?", map);
-
-//        HashMap<String, Object> map = new HashMap<>();
-//        map.put("name", 1);
-//        quartzService.deleteJob("job", "test");
-//        quartzService.addJob(Job.class, "job", "test", "0 * * * * ?", map);
-//
-//        map.put("name", 2);
-//        quartzService.deleteJob("job2", "test");
-//        quartzService.addJob(Job.class, "job2", "test", "10 * * * * ?", map);
-//
-//        map.put("name", 3);
-//        quartzService.deleteJob("job3", "test2");
-//        quartzService.addJob(Job.class, "job3", "test2", "15 * * * * ?", map);
         return ResultUtil.ok(SystemConstant.SUCCESS);
     }
 

+ 3 - 3
themis-backend/src/main/java/com/qmth/themis/backend/quartz/MqJob.java

@@ -32,10 +32,10 @@ public class MqJob extends QuartzJobBean {
 
     @Override
     protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
-        Long size = redisUtil.getHashSize(SystemConstant.SESSION_TOPIC_ERROR_LIST);
+        Long size = redisUtil.getHashSize(SystemConstant.SESSION_TOPIC_BUFFER_LIST);
         if (Objects.nonNull(size) && size.longValue() > 0) {
-            log.info("session_topic有异常的消息数为:{}", size);
-            Map map = redisUtil.getHashEntries(SystemConstant.SESSION_TOPIC_ERROR_LIST);
+            log.info("session_topic缓冲区的消息数为:{}", size);
+            Map map = redisUtil.getHashEntries(SystemConstant.SESSION_TOPIC_BUFFER_LIST);
             map.forEach((k, v) -> {
                 MqDto mqDto = (MqDto) v;
                 producerServer.sendOneWay(mqDto);

+ 42 - 0
themis-backend/src/main/java/com/qmth/themis/backend/start/StartRunning.java

@@ -0,0 +1,42 @@
+package com.qmth.themis.backend.start;
+
+import com.qmth.themis.backend.config.DictionaryConfig;
+import com.qmth.themis.backend.quartz.MqJob;
+import com.qmth.themis.business.service.QuartzService;
+import org.assertj.core.util.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.Map;
+
+/**
+ * @Description: 服务启动时初始化运行,哪个微服务模块需要则拿此模版去用
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/3
+ */
+@Component
+public class StartRunning implements CommandLineRunner {
+    private final static Logger log = LoggerFactory.getLogger(StartRunning.class);
+
+    @Resource
+    QuartzService quartzService;
+
+    @Resource
+    DictionaryConfig dictionaryConfig;
+
+    @Override
+    public void run(String... args) throws Exception {
+        log.info("服务器启动时执行 start");
+        log.info("增加mqjob start");
+        Map map = Maps.newHashMap("name", MqJob.class.getName());
+        quartzService.deleteJob(dictionaryConfig.quartzConfigDomain().getJobName(), dictionaryConfig.quartzConfigDomain().getJobGroupName());
+        quartzService.addJob(MqJob.class, dictionaryConfig.quartzConfigDomain().getJobName(), dictionaryConfig.quartzConfigDomain().getJobGroupName(), "0 0/1 * * * ?", map);
+        log.info("增加mqjob end");
+        log.info("服务器启动时执行 end");
+    }
+}

+ 13 - 2
themis-business/src/main/java/com/qmth/themis/business/constant/SystemConstant.java

@@ -41,8 +41,8 @@ public class SystemConstant {
     public static final String ALL_PATH = "/**";
     public static final String ACCOUNT = "account";
     public static final String ERROR = "/error";
-    public static final String SESSION_TOPIC_ERROR_LIST = "session:topic:error:list";
-    public static final String USERLOG_TOPIC_ERROR_LIST = "userLog:topic:error:list";
+    public static final String SESSION_TOPIC_BUFFER_LIST = "session:topic:buffer:list";
+    public static final String USERLOG_TOPIC_BUFFER_LIST = "userLog:topic:buffer:list";
     /**
      * session过期时间
      */
@@ -60,6 +60,8 @@ public class SystemConstant {
      */
     public static final int CONSUME_MESSAGE_BATCH_MAX_SIZE = 10;
     public static final int MAXRECONSUMETIMES = 3;
+    public static final String OBJ_ID = "objId";
+
     public static final int DELIVERED_ACK_TYPE = 0;//消息"已发出",但尚未处理结束
     public static final int POSION_ACK_TYPE = 1;//消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
     public static final int STANDARD_ACK_TYPE = 2;//"标准"类型,通常表示为消息"处理成功",broker端可以删除消息了
@@ -67,6 +69,15 @@ public class SystemConstant {
     public static final int INDIVIDUAL_ACK_TYPE = 4;//表示只确认"单条消息",无论在任何ACK_MODE下
     public static final int UNMATCHED_ACK_TYPE = 5;//BROKER间转发消息时,接收端"拒绝"消息
     public static final int UNSEND_ACK_TYPE = 6;//消息未发出
+    /**
+     * 线程池配置
+     */
+    public static final String THREAD_POOL_NAME = "arbitrateThreadPool";
+    public static final int THREAD_POOL_CORE_POOL_SIZE = 20;
+    public static final int THREAD_POOL_MAX_POOL_SIZE = 40;
+    public static final int THREAD_POOL_KEEP_ALIVE_SECONDS = 60;
+    public static final int THREAD_POOL_QUEUE_CAPACITY = 100;
+
 
     /**
      * 获取过期时间

+ 21 - 11
themis-business/src/main/java/com/qmth/themis/business/dto/MqDto.java

@@ -1,6 +1,5 @@
 package com.qmth.themis.business.dto;
 
-import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.enums.MqEnum;
 
 import java.io.Serializable;
@@ -22,8 +21,9 @@ public class MqDto implements Serializable {
     private String tag;//消息tag
     private long timestamp;//时间戳
     private Object body;//消息体
-    private MqEnum type;//业务类型
-    private String bid;//关联业务id
+    private MqEnum type;//消息类型
+    private String objId;//关联业务id
+    private String objName;//关联业务名称
     private Integer ack;//ack
     private Integer sequence;//序号
     private Map<String, Object> properties;//扩展类型
@@ -32,22 +32,24 @@ public class MqDto implements Serializable {
 
     }
 
-    public MqDto(String topic, String tag, Object body, MqEnum type, String bid) {
+    public MqDto(String topic, String tag, Object body, MqEnum type, String objId, String objName) {
         this.topic = topic;
         this.tag = tag;
         this.body = body;
         this.type = type;
-        this.bid = bid;
+        this.objId = objId;
+        this.objName = objName;
         this.timestamp = System.currentTimeMillis();
         this.id = String.valueOf(UUID.randomUUID()).replaceAll("-", "");
     }
 
-    public MqDto(String topic, String tag, Object body, MqEnum type, String bid, Map properties) {
+    public MqDto(String topic, String tag, Object body, MqEnum type, String objId, Map properties, String objName) {
         this.topic = topic;
         this.tag = tag;
         this.body = body;
         this.type = type;
-        this.bid = bid;
+        this.objId = objId;
+        this.objName = objName;
         this.timestamp = System.currentTimeMillis();
         this.id = String.valueOf(UUID.randomUUID()).replaceAll("-", "");
         this.properties = properties;
@@ -105,12 +107,20 @@ public class MqDto implements Serializable {
         this.body = body;
     }
 
-    public String getBid() {
-        return bid;
+    public String getObjId() {
+        return objId;
     }
 
-    public void setBid(String bid) {
-        this.bid = bid;
+    public void setObjId(String objId) {
+        this.objId = objId;
+    }
+
+    public String getObjName() {
+        return objName;
+    }
+
+    public void setObjName(String objName) {
+        this.objName = objName;
     }
 
     public Map<String, Object> getProperties() {

+ 6 - 12
themis-business/src/main/java/com/qmth/themis/business/listener/RocketSessionConsumer.java

@@ -66,19 +66,13 @@ public class RocketSessionConsumer implements
                 MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
                 log.info("sessionConsumer接受到的消息:{}", JSONObject.toJSONString(mqDto));
                 log.info("sessionConsumer mqDto sequence:{},tag:{}", mqDto.getSequence(), mqDto.getTag());
-                MqDto redisMqdto = (MqDto) redisUtil.getSessionTopicList(SystemConstant.SESSION_TOPIC_ERROR_LIST, mqDto.getId());
-                if (Objects.nonNull(redisMqdto)) {
-                    if (Objects.nonNull(redisMqdto.getAck()) && redisMqdto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
-                        log.info("更新db");
-                        tbSessionService.saveSessionInfo(toJavaObject((JSON) mqDto.getBody(), TBSession.class), redisMqdto.getTimestamp());
-                        redisUtil.deleteSessionTopicList(SystemConstant.SESSION_TOPIC_ERROR_LIST, redisMqdto.getId());
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                    } else {
-                        log.info("消息ack未确认,重发");
-                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-                    }
+                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
+                    log.info("更新db");
+                    tbSessionService.saveSessionInfo(toJavaObject((JSON) mqDto.getBody(), TBSession.class), mqDto.getTimestamp());
+                    redisUtil.deleteSessionTopicList(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId());
+                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 } else {
-                    log.info("消息数据为空,重发消息");
+                    log.info("消息ack未确认,重发");
                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
                 }
             }

+ 6 - 12
themis-business/src/main/java/com/qmth/themis/business/listener/RocketUserLogConsumer.java

@@ -55,19 +55,13 @@ public class RocketUserLogConsumer implements MessageListenerConcurrently {
                 MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
                 log.info("userLogConsumer接受到的消息:{}", JSONObject.toJSONString(mqDto));
                 log.info("userLogConsumer mqDto sequence:{},tag:{}", mqDto.getSequence(), mqDto.getTag());
-                MqDto redisMqdto = (MqDto) redisUtil.getSessionTopicList(SystemConstant.USERLOG_TOPIC_ERROR_LIST, mqDto.getId());
-                if (Objects.nonNull(redisMqdto)) {
-                    if (Objects.nonNull(redisMqdto.getAck()) && redisMqdto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
-                        log.info("插入用户轨迹日志");
-                        teUserLogService.saveUserLogInfo(redisMqdto.getTimestamp(), mqDto.getBid(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JSONObject.toJSONString(mqDto));
-                        redisUtil.deleteSessionTopicList(SystemConstant.USERLOG_TOPIC_ERROR_LIST, redisMqdto.getId());
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                    } else {
-                        log.info("消息ack未确认,重发");
-                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-                    }
+                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
+                    log.info("插入用户轨迹日志");
+                    teUserLogService.saveUserLogInfo(mqDto.getTimestamp(), mqDto.getObjId(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JSONObject.toJSONString(mqDto));
+                    redisUtil.deleteSessionTopicList(SystemConstant.USERLOG_TOPIC_BUFFER_LIST, mqDto.getId());
+                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 } else {
-                    log.info("消息数据为空,重发消息");
+                    log.info("消息ack未确认,重发");
                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
                 }
             }

+ 2 - 2
themis-business/src/main/java/com/qmth/themis/business/service/TBSessionService.java

@@ -16,8 +16,8 @@ public interface TBSessionService extends IService<TBSession> {
      * 保存session信息
      *
      * @param tbSession
-     * @param redisTimestamp
+     * @param mqTimestamp
      * @return
      */
-    public boolean saveSessionInfo(TBSession tbSession, long redisTimestamp);
+    public boolean saveSessionInfo(TBSession tbSession, long mqTimestamp);
 }

+ 2 - 2
themis-business/src/main/java/com/qmth/themis/business/service/TEUserLogService.java

@@ -15,9 +15,9 @@ public interface TEUserLogService extends IService<TEUserLog> {
     /**
      * 保存用户轨迹
      *
-     * @param redisTimestamp
+     * @param mqTimestamp
      * @param o
      * @return
      */
-    public boolean saveUserLogInfo(long redisTimestamp, Object... o);
+    public boolean saveUserLogInfo(long mqTimestamp, Object... o);
 }

+ 16 - 13
themis-business/src/main/java/com/qmth/themis/business/service/impl/MqDtoServiceImpl.java

@@ -36,32 +36,35 @@ public class MqDtoServiceImpl implements MqDtoService {
     @Override
     public MqDto assembleSendOneWayMsg(Object... o) {
         MqDto mqDto = null;
-        try {
+        MqEnum mqEnum = MqEnum.valueOf(String.valueOf(o[3]));
 //            for (int i = 0; i < 100; i++) {
 //                TBSession tbSession = new TBSession(sessionId, String.valueOf(user.getId()), authDto.getRoleEnum().name(), platform.getSource(), platform.name(), deviceId, request.getLocalAddr(), token, expire);
 //                redisUtil.setUserSession(sessionId, tbSession);
 //                int random = (int) (Math.random() * Source.values().length);
-            //往mq发送消息插入会话信息
-            MqEnum mqEnum = MqEnum.valueOf(String.valueOf(o[3]));
-            mqDto = new MqDto(String.valueOf(o[0]), String.valueOf(o[1]), o[2], mqEnum, String.valueOf(o[4]));
+        //往mq发送消息插入会话信息
+        mqDto = new MqDto(String.valueOf(o[0]), String.valueOf(o[1]), o[2], mqEnum, String.valueOf(o[4]), String.valueOf(o[5]));
 //                mqDto = new MqDto(SystemConstant.SESSION_TOPIC, Source.values()[random].name(), tbSession, MqEnum.SESSION, tbSession.getId());
 //                mqDto.setSequence(i);
-            mqDto.setAck(SystemConstant.DELIVERED_ACK_TYPE);
+        mqDto.setAck(SystemConstant.DELIVERED_ACK_TYPE);
+        try {
             producerServer.sendOneWay(mqDto);
-            switch (mqEnum.ordinal()) {
-                case 0:
-                    redisUtil.setSessionTopicList(SystemConstant.SESSION_TOPIC_ERROR_LIST, mqDto.getId(), mqDto);
-                    break;
-                default:
-                    redisUtil.setSessionTopicList(SystemConstant.USERLOG_TOPIC_ERROR_LIST, mqDto.getId(), mqDto);
-                    break;
-            }
 //            }
         } catch (Exception e) {
             e.printStackTrace();
             if (Objects.nonNull(mqDto)) {
                 mqDto.setAck(SystemConstant.UNSEND_ACK_TYPE);
             }
+        } finally {
+            if (Objects.nonNull(mqDto)) {
+                switch (mqEnum.ordinal()) {
+                    case 0:
+                        redisUtil.setSessionTopicList(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);
+                        break;
+                    default:
+                        redisUtil.setSessionTopicList(SystemConstant.USERLOG_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);
+                        break;
+                }
+            }
         }
         return mqDto;
     }

+ 4 - 4
themis-business/src/main/java/com/qmth/themis/business/service/impl/ProducerServerImpl.java

@@ -57,7 +57,7 @@ public class ProducerServerImpl implements ProducerServer {
     @Override
     public Result syncOrderlyMsg(MqDto mqDto) {
         log.info("syncOrderlyMsg mqDto:{}", JSONObject.toJSONString(mqDto));
-        SendResult sendResult = rocketMQTemplate.syncSendOrderly(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto, mqDto.getBid());
+        SendResult sendResult = rocketMQTemplate.syncSendOrderly(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto, mqDto.getObjId());
         // 同步顺序消息发送成功会有一个返回值,我们可以用这个返回值进行判断和获取一些信息
         log.info("sendResult:{}", JSONObject.toJSONString(sendResult));
         return ResultUtil.ok(SystemConstant.SUCCESS);
@@ -97,7 +97,7 @@ public class ProducerServerImpl implements ProducerServer {
     @Override
     public Result asyncOrderlyMsg(MqDto mqDto) {
         log.info("asyncOrderlyMsg mqDto:{}", JSONObject.toJSONString(mqDto));
-        rocketMQTemplate.asyncSendOrderly(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto, mqDto.getBid(), new SendCallback() {
+        rocketMQTemplate.asyncSendOrderly(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto, mqDto.getObjId(), new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
                 // 成功回调
@@ -135,7 +135,7 @@ public class ProducerServerImpl implements ProducerServer {
     @Override
     public Result sendOneWayOrderly(MqDto mqDto) {
         log.info("sendOneWayOrderly mqDto:{}", JSONObject.toJSONString(mqDto));
-        rocketMQTemplate.sendOneWayOrderly(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto, mqDto.getBid());
+        rocketMQTemplate.sendOneWayOrderly(mqDto.getTopic() + ":" + mqDto.getTag(), mqDto, mqDto.getObjId());
         return ResultUtil.ok(SystemConstant.SUCCESS);
     }
 
@@ -167,7 +167,7 @@ public class ProducerServerImpl implements ProducerServer {
         message.setTopic(mqDto.getTopic());
         message.setBody(String.valueOf(mqDto.getBody()).getBytes());
         message.setTags(mqDto.getTag());
-        message.putUserProperty("bid", String.valueOf(mqDto.getBid()));
+        message.putUserProperty(SystemConstant.OBJ_ID, String.valueOf(mqDto.getObjId()));
         return message;
     }
 }

+ 5 - 5
themis-business/src/main/java/com/qmth/themis/business/service/impl/TBSessionServiceImpl.java

@@ -27,11 +27,11 @@ public class TBSessionServiceImpl extends ServiceImpl<TBSessionMapper, TBSession
      * 保存session信息
      *
      * @param tbSession
-     * @param redisTimestamp
+     * @param mqTimestamp
      * @return
      */
     @Override
-    public boolean saveSessionInfo(TBSession tbSession, long redisTimestamp) {
+    public boolean saveSessionInfo(TBSession tbSession, long mqTimestamp) {
         TBSession tbSessionDb = this.getById(tbSession.getId());
         if (Objects.isNull(tbSessionDb)) {
             Gson gson = new Gson();
@@ -39,8 +39,8 @@ public class TBSessionServiceImpl extends ServiceImpl<TBSessionMapper, TBSession
             this.saveOrUpdate(tbSessionDb);
         } else if (Objects.nonNull(tbSessionDb.getUpdateTime())) {
             long dbtimestamp = tbSessionDb.getUpdateTime().getTime();
-            if (redisTimestamp > dbtimestamp) {
-                log.info("redis时间大于db时间,可以更新");
+            if (mqTimestamp > dbtimestamp) {
+                log.info("mq时间大于db时间,可以更新");
                 tbSessionDb.setDeviceId(tbSession.getDeviceId());
                 tbSessionDb.setAccessToken(tbSession.getAccessToken());
                 tbSessionDb.setLastAccessIp(tbSession.getLastAccessIp());
@@ -49,7 +49,7 @@ public class TBSessionServiceImpl extends ServiceImpl<TBSessionMapper, TBSession
                 tbSessionDb.setExpireTime(tbSession.getExpireTime());
                 this.saveOrUpdate(tbSessionDb);
             } else {
-                log.info("redis时间小于db时间,不予更新");
+                log.info("mq时间小于db时间,不予更新");
             }
         }
         return true;

+ 2 - 2
themis-business/src/main/java/com/qmth/themis/business/service/impl/TEUserLogServiceImpl.java

@@ -22,12 +22,12 @@ public class TEUserLogServiceImpl extends ServiceImpl<TEUserLogMapper, TEUserLog
     /**
      * 保存用户轨迹
      *
-     * @param redisTimestamp
+     * @param mqTimestamp
      * @param o
      * @return
      */
     @Override
-    public boolean saveUserLogInfo(long redisTimestamp, Object... o) {
+    public boolean saveUserLogInfo(long mqTimestamp, Object... o) {
         TEUserLog teUserLog = new TEUserLog(Long.parseLong(String.valueOf(o[0])), Integer.parseInt(String.valueOf(o[1])), String.valueOf(o[2]), String.valueOf(o[3]));
         this.save(teUserLog);
         return true;

+ 51 - 0
themis-business/src/main/java/com/qmth/themis/business/threadPool/MyThreadPool.java

@@ -0,0 +1,51 @@
+package com.qmth.themis.business.threadPool;
+
+import com.qmth.themis.business.constant.SystemConstant;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import javax.annotation.PostConstruct;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * @Description: 线程池应用配置
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2019/3/21
+ */
+@Configuration
+public class MyThreadPool extends ThreadPoolTaskExecutor {
+
+    public MyThreadPool arbitratePoolTaskExecutor = null;
+
+    @PostConstruct
+    public void init() {
+        arbitrateThreadPool();
+    }
+
+    /**
+     * 仲裁线程池
+     *
+     * @return
+     */
+    @Bean
+    public Executor arbitrateThreadPool() {
+        if (arbitratePoolTaskExecutor == null) {
+            arbitratePoolTaskExecutor = new MyThreadPool();
+            arbitratePoolTaskExecutor.setCorePoolSize(SystemConstant.THREAD_POOL_CORE_POOL_SIZE);//核心线程数
+            arbitratePoolTaskExecutor.setMaxPoolSize(SystemConstant.THREAD_POOL_MAX_POOL_SIZE);//最大线程数
+            arbitratePoolTaskExecutor.setKeepAliveSeconds(SystemConstant.THREAD_POOL_KEEP_ALIVE_SECONDS);//线程空闲时间
+            arbitratePoolTaskExecutor.setQueueCapacity(SystemConstant.THREAD_POOL_QUEUE_CAPACITY);//队列容量
+            arbitratePoolTaskExecutor.setThreadNamePrefix(SystemConstant.THREAD_POOL_NAME);
+
+            // rejection-policy:当pool已经达到max size的时候,如何处理新任务
+            // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
+            arbitratePoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+            arbitratePoolTaskExecutor.initialize();
+        }
+        return arbitratePoolTaskExecutor;
+    }
+}