wangliang 5 سال پیش
والد
کامیت
0ec3ac4f30

+ 3 - 0
themis-backend/src/main/java/com/qmth/themis/backend/api/TBUserController.java

@@ -162,6 +162,9 @@ public class TBUserController {
             redisUtil.deleteUser(tbUser.getId());
             ehcacheService.removeAccountCache(tbUser.getId());
         }
+        //mq发送消息start
+        mqDtoService.assembleSendOneWayMsg(dictionaryConfig.mqConfigDomain().getUserLogTopic(), Objects.equals(authDto.getRoleEnum().name(), RoleEnum.STUDENT.name()) ? dictionaryConfig.mqConfigDomain().getUserLogTopicStudentTag() : dictionaryConfig.mqConfigDomain().getUserLogTopicUserTag(), SystemOperationEnum.LOGOUT, MqEnum.USER_LOG.name(), tbUser.getId(), tbUser.getLoginName());
+        //mq发送消息end
         return ResultUtil.ok(JSONObject.parseObject(SystemConstant.SUCCESS));
     }
 }

+ 18 - 3
themis-backend/src/main/java/com/qmth/themis/backend/quartz/MqJob.java

@@ -3,6 +3,7 @@ package com.qmth.themis.backend.quartz;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.dto.MqDto;
 import com.qmth.themis.business.service.ProducerServer;
+import com.qmth.themis.business.threadPool.MyThreadPool;
 import com.qmth.themis.business.util.RedisUtil;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
@@ -30,12 +31,26 @@ public class MqJob extends QuartzJobBean {
     @Resource
     ProducerServer producerServer;
 
+    @Resource
+    MyThreadPool myThreadPool;
+
     @Override
     protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
-        Long size = redisUtil.getHashSize(SystemConstant.SESSION_TOPIC_BUFFER_LIST);
+        myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
+            log.info("session_topic_job进来了");
+            this.assembleJob(SystemConstant.SESSION_TOPIC_BUFFER_LIST);
+        });
+        myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
+            log.info("userLog_topic_job进来了");
+            this.assembleJob(SystemConstant.USERLOG_TOPIC_BUFFER_LIST);
+        });
+    }
+
+    public void assembleJob(String redisKey) {
+        Long size = redisUtil.getHashSize(redisKey);
         if (Objects.nonNull(size) && size.longValue() > 0) {
-            log.info("session_topic缓冲区的消息数为:{}", size);
-            Map map = redisUtil.getHashEntries(SystemConstant.SESSION_TOPIC_BUFFER_LIST);
+            log.info("redisKey:{}缓冲区的消息数为:{}", redisKey, size);
+            Map map = redisUtil.getHashEntries(redisKey);
             map.forEach((k, v) -> {
                 MqDto mqDto = (MqDto) v;
                 producerServer.sendOneWay(mqDto);

+ 14 - 7
themis-business/src/main/java/com/qmth/themis/business/service/impl/MqDtoServiceImpl.java

@@ -37,18 +37,25 @@ public class MqDtoServiceImpl implements MqDtoService {
     public MqDto assembleSendOneWayMsg(Object... o) {
         MqDto mqDto = null;
         MqEnum mqEnum = MqEnum.valueOf(String.valueOf(o[3]));
-//            for (int i = 0; i < 100; i++) {
-//                TBSession tbSession = new TBSession(sessionId, String.valueOf(user.getId()), authDto.getRoleEnum().name(), platform.getSource(), platform.name(), deviceId, request.getLocalAddr(), token, expire);
-//                redisUtil.setUserSession(sessionId, tbSession);
-//                int random = (int) (Math.random() * Source.values().length);
+//        for (int i = 0; i < 100; i++) {
+//            int random = (int) (Math.random() * Source.values().length);
         //往mq发送消息插入会话信息
         mqDto = new MqDto(String.valueOf(o[0]), String.valueOf(o[1]), o[2], mqEnum, String.valueOf(o[4]), String.valueOf(o[5]));
-//                mqDto = new MqDto(SystemConstant.SESSION_TOPIC, Source.values()[random].name(), tbSession, MqEnum.SESSION, tbSession.getId());
-//                mqDto.setSequence(i);
+//            mqDto = new MqDto(String.valueOf(o[0]), Source.values()[random].name(), o[2], mqEnum, String.valueOf(o[4]), String.valueOf(o[5]));
+//            mqDto.setSequence(i);
         mqDto.setAck(SystemConstant.DELIVERED_ACK_TYPE);
+//            producerServer.sendOneWay(mqDto);
+//            switch (mqEnum.ordinal()) {
+//                case 0:
+//                    redisUtil.setSessionTopicList(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);
+//                    break;
+//                default:
+//                    redisUtil.setSessionTopicList(SystemConstant.USERLOG_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);
+//                    break;
+//            }
+//        }
         try {
             producerServer.sendOneWay(mqDto);
-//            }
         } catch (Exception e) {
             e.printStackTrace();
             if (Objects.nonNull(mqDto)) {