wangliang 4 år sedan
förälder
incheckning
d94215dad8

+ 31 - 0
themis-exam/src/main/java/com/qmth/themis/exam/listener/service/MqOeLogicService.java

@@ -0,0 +1,31 @@
+package com.qmth.themis.exam.listener.service;
+
+import com.qmth.themis.mq.dto.MqDto;
+
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * @Description: mq执行逻辑
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/31
+ */
+public interface MqOeLogicService {
+
+    /**
+     * mq最大重试次数逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    public void execMqMaxReconsumeTime(MqDto mqDto, String key);
+
+    /**
+     * oe逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    public void execMqOqLogic(MqDto mqDto, String key) throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException;
+}

+ 127 - 0
themis-exam/src/main/java/com/qmth/themis/exam/listener/service/impl/MqOeLogicServiceImpl.java

@@ -0,0 +1,127 @@
+package com.qmth.themis.exam.listener.service.impl;
+
+import com.alibaba.fastjson.JSONArray;
+import com.google.gson.Gson;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.dto.WebsocketDto;
+import com.qmth.themis.business.entity.TMRocketMessage;
+import com.qmth.themis.business.enums.MqEnum;
+import com.qmth.themis.business.service.TMRocketMessageService;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.exam.listener.service.MqOeLogicService;
+import com.qmth.themis.exam.websocket.WebSocketOeServer;
+import com.qmth.themis.mq.dto.MqDto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.Resource;
+import java.lang.reflect.InvocationTargetException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @Description: mq执行逻辑
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/31
+ */
+@Service
+public class MqOeLogicServiceImpl implements MqOeLogicService {
+    private final static Logger log = LoggerFactory.getLogger(MqOeLogicServiceImpl.class);
+
+    @Resource
+    RedisUtil redisUtil;
+
+    @Resource
+    TMRocketMessageService tmRocketMessageService;
+
+    /**
+     * mq最大重试次数逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    @Override
+    @Transactional
+    public void execMqMaxReconsumeTime(MqDto mqDto, String key) {
+        //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
+        Gson gson = new Gson();
+        mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
+        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
+        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+        redisUtil.delete(key, mqDto.getId());
+    }
+
+    /**
+     * oe逻辑
+     *
+     * @param mqDto
+     * @param key
+     * @throws ClassNotFoundException
+     * @throws IllegalAccessException
+     * @throws InstantiationException
+     * @throws NoSuchMethodException
+     * @throws InvocationTargetException
+     */
+    @Override
+    @Transactional
+    public void execMqOqLogic(MqDto mqDto, String key) throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
+        Gson gson = new Gson();
+        MqEnum mqEnum = mqDto.getType();
+        ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
+        if (MqEnum.WEBSOCKET_OFFLINE_LOG.ordinal() == mqEnum.ordinal()) {//下线
+            JSONArray jsonArray = JSONArray.parseArray(String.valueOf(mqDto.getBody()));
+            Set<String> examStudentIdentitySet = jsonArray.toJavaObject(Set.class);
+            log.info("examStudentIdentitySet:{}", JacksonUtil.parseJson(examStudentIdentitySet));
+            webSocketMap.forEach((k, v) -> {
+                examStudentIdentitySet.forEach(s -> {
+                    if (k.contains(s)) {
+                        Map map = new HashMap<>();
+                        map.put("offLineId", k);
+                        WebsocketDto websocketDto = new WebsocketDto("offLine", map);
+                        v.sendMessage(websocketDto);
+                    }
+                });
+            });
+        } else if (MqEnum.WEBSOCKET_IM_CLUSTERING_LOG.ordinal() == mqEnum.ordinal()) {//点对点消息
+            JSONArray jsonArray = JSONArray.parseArray(String.valueOf(mqDto.getBody()));
+            Set<String> examStudentIdentitySet = jsonArray.toJavaObject(Set.class);
+            log.info("examStudentIdentitySet:{}", JacksonUtil.parseJson(examStudentIdentitySet));
+            webSocketMap.forEach((k, v) -> {
+                examStudentIdentitySet.forEach(s -> {
+                    if (k.contains(s)) {
+                        Map map = new HashMap<>();
+                        map.put("message", k);
+                        WebsocketDto websocketDto = new WebsocketDto("message", map);
+                        v.sendMessage(websocketDto);
+                        return;
+                    }
+                });
+            });
+        } else if (MqEnum.WEBSOCKET_IM_BROADCASTING_LOG.ordinal() == mqEnum.ordinal()) {//广播消息
+            JSONArray jsonArray = JSONArray.parseArray(String.valueOf(mqDto.getBody()));
+            Set<String> examStudentIdentitySet = jsonArray.toJavaObject(Set.class);
+            log.info("examStudentIdentitySet:{}", JacksonUtil.parseJson(examStudentIdentitySet));
+            webSocketMap.forEach((k, v) -> {
+                examStudentIdentitySet.forEach(s -> {
+                    if (k.contains(s)) {
+                        Map map = new HashMap<>();
+                        map.put("message", k);
+                        WebsocketDto websocketDto = new WebsocketDto("message", map);
+                        v.sendMessage(websocketDto);
+                    }
+                });
+            });
+        }
+        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
+        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+        redisUtil.delete(key, mqDto.getId());
+    }
+}

+ 25 - 79
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketOeServer.java

@@ -6,11 +6,9 @@ import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.dto.WebsocketDto;
 import com.qmth.themis.business.entity.TBSession;
-import com.qmth.themis.business.entity.TMRocketMessage;
 import com.qmth.themis.business.enums.MqEnum;
 import com.qmth.themis.business.enums.SystemOperationEnum;
 import com.qmth.themis.business.enums.WebsocketTypeEnum;
-import com.qmth.themis.business.service.TMRocketMessageService;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.business.util.WebsocketUtil;
@@ -19,32 +17,21 @@ import com.qmth.themis.common.enums.ExceptionResultEnum;
 import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.common.signature.SignatureInfo;
 import com.qmth.themis.common.signature.SignatureType;
+import com.qmth.themis.exam.listener.service.MqOeLogicService;
 import com.qmth.themis.exam.websocketTemplete.WebSocketOeMessageTemplete;
 import com.qmth.themis.mq.dto.MqDto;
 import com.qmth.themis.mq.enums.MqTagEnum;
 import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.service.MqDtoService;
 import com.qmth.themis.mq.templete.Concurrently;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
-import org.apache.rocketmq.spring.annotation.SelectorType;
-import org.apache.rocketmq.spring.core.RocketMQListener;
-import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
 
 import javax.websocket.*;
-import javax.websocket.server.PathParam;
 import javax.websocket.server.ServerEndpoint;
 import java.io.IOException;
 import java.lang.reflect.Method;
@@ -125,21 +112,15 @@ public class WebSocketOeServer implements Concurrently {
                         addOnlineCount();
                         //在线数加1
                     }
-                    //发送恢复网络mq消息
                     log.info("用户连接:" + this.sessionId + ",当前在线人数为:" + getOnlineCount());
-                    try {
-                        InetSocketAddress addr = (InetSocketAddress) WebsocketUtil.getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
-                        this.ip = addr.toString().replace("/", "").split(":")[0];
-                        this.sendMessage("ip[" + this.ip + "]连接成功");
-                        tranMap = new HashMap<>();
-                        tranMap.put("recordId", this.recordId);
-                        tranMap.put("deviceId", this.deviceId);
-                        tranMap.put("ip", this.ip);
-                        tranMap.put("updateTime", this.updateTime);
-                    } catch (IOException e) {
-                        e.printStackTrace();
-                        log.error("用户:" + this.sessionId + ",网络异常!!!!!!");
-                    }
+                    InetSocketAddress addr = (InetSocketAddress) WebsocketUtil.getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
+                    this.ip = addr.toString().replace("/", "").split(":")[0];
+                    this.sendMessage("ip[" + this.ip + "]连接成功");
+                    tranMap = new HashMap<>();
+                    tranMap.put("recordId", this.recordId);
+                    tranMap.put("deviceId", this.deviceId);
+                    tranMap.put("ip", this.ip);
+                    tranMap.put("updateTime", this.updateTime);
                 } else {
                     throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
                 }
@@ -233,24 +214,12 @@ public class WebSocketOeServer implements Concurrently {
      * @param message
      * @throws IOException
      */
-    public void sendMessage(String message) throws IOException {
-        log.info("message:{}", message);
-        this.session.getAsyncRemote().sendText(message);
+    public void sendMessage(Object message) {
+        log.info("message:{}", JacksonUtil.parseJson(message));
+        this.session.getAsyncRemote().sendText(JacksonUtil.parseJson(message));
         this.updateTime = System.currentTimeMillis();
     }
 
-    /**
-     * 发送自定义消息
-     */
-    public static void sendInfo(String message, @PathParam("sessionId") String sessionId) throws IOException {
-        log.info("发送消息到:{},报文:{}", sessionId, message);
-        if (Objects.nonNull(sessionId) && webSocketMap.containsKey(sessionId)) {
-            webSocketMap.get(sessionId).sendMessage(message);
-        } else {
-            log.error("用户[:{}]不在线!", sessionId);
-        }
-    }
-
     /**
      * 获取在线人数
      *
@@ -294,58 +263,27 @@ public class WebSocketOeServer implements Concurrently {
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
-        TMRocketMessageService tmRocketMessageService = SpringContextHolder.getBean(TMRocketMessageService.class);
+        MqOeLogicService mqOeLogicService = SpringContextHolder.getBean(MqOeLogicService.class);
         MqDto mqDto = null;
         try {
             long threadId = Thread.currentThread().getId();
             String threadName = Thread.currentThread().getName();
-            Gson gson = new Gson();
             for (MessageExt messageExt : msgs) {
                 log.info(":{}-:{} websocket oe Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
                 mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                 log.info(":{}-:{} websocket oe Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
                 int reconsumeTime = messageExt.getReconsumeTimes();
                 if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
-                    //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
-                    mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
-                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                    redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
+                    mqOeLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                        MqEnum mqEnum = mqDto.getType();
-                        if (MqEnum.WEBSOCKET_OFFLINE_LOG.ordinal() == mqEnum.ordinal()) {//下线
-
-                        } else if (MqEnum.WEBSOCKET_IM_CLUSTERING_LOG.ordinal() == mqEnum.ordinal()) {//点对点消息
-
-                        } else if (MqEnum.WEBSOCKET_IM_BROADCASTING_LOG.ordinal() == mqEnum.ordinal()) {//广播消息
-
-                        }
-                    }else {
+                        mqOeLogicService.execMqOqLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } else {
                         log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
                         return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
                     }
                 }
-//                log.info(":{}-:{} websocketConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
-//                MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
-//                log.info(":{}-:{} websocketConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
-//                log.info(":{}-:{} websocketConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
-//                Map map = mqDto.getProperties();
-//                String body = JacksonUtil.parseJson(mqDto.getBody());
-//                log.info("map:{},body:{}", JacksonUtil.parseJson(map), body);
-//                String model = String.valueOf(map.get("model"));
-//                MessageModel messageModel = MessageModel.valueOf(model);
-//                if (messageModel.ordinal() == MessageModel.CLUSTERING.ordinal()) {
-//                    webSocketMap.get(map.get("toUserId")).sendMessage(body);
-//                } else {
-//                    webSocketMap.forEach((k, v) -> {
-//                        try {
-//                            v.sendMessage(body);
-//                        } catch (IOException e) {
-//                            e.printStackTrace();
-//                        }
-//                    });
-//                }
             }
         } catch (Exception e) {
             e.printStackTrace();
@@ -358,5 +296,13 @@ public class WebSocketOeServer implements Concurrently {
         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
     }
+
+    public static ConcurrentHashMap<String, WebSocketOeServer> getWebSocketMap() {
+        return webSocketMap;
+    }
+
+    public static void setWebSocketMap(ConcurrentHashMap<String, WebSocketOeServer> webSocketMap) {
+        WebSocketOeServer.webSocketMap = webSocketMap;
+    }
 }
 

+ 2 - 4
themis-exam/src/main/java/com/qmth/themis/exam/websocketTemplete/WebSocketOeMessageTemplete.java

@@ -98,8 +98,7 @@ public class WebSocketOeMessageTemplete {
     public WebsocketDto syncAck() {
         Map map = new HashMap<>();
         map.put(SystemConstant.ACK_MESSAGE, System.currentTimeMillis());
-        WebsocketDto websocketDto = new WebsocketDto(Thread.currentThread().getStackTrace()[1].getMethodName(), map);
-        return websocketDto;
+        return new WebsocketDto(Thread.currentThread().getStackTrace()[1].getMethodName(), map);
     }
 
     /**
@@ -130,8 +129,7 @@ public class WebSocketOeMessageTemplete {
     public WebsocketDto invigilateNoticeAck() {
         Map map = new HashMap<>();
         map.put(SystemConstant.ACK_MESSAGE, System.currentTimeMillis());
-        WebsocketDto websocketDto = new WebsocketDto(this.getClass().getEnclosingMethod().getName(), map);
-        return websocketDto;
+        return new WebsocketDto(this.getClass().getEnclosingMethod().getName(), map);
     }
 
     /**

+ 4 - 0
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqLogicServiceImpl.java

@@ -84,6 +84,7 @@ public class MqLogicServiceImpl implements MqLogicService {
         Gson gson = new Gson();
         mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
         redisUtil.delete(key, mqDto.getId());
     }
@@ -106,6 +107,7 @@ public class MqLogicServiceImpl implements MqLogicService {
         }
         mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
         redisUtil.delete(key, mqDto.getId());
     }
@@ -170,6 +172,7 @@ public class MqLogicServiceImpl implements MqLogicService {
         mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
         mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
         redisUtil.delete(key, mqDto.getId());
     }
@@ -222,6 +225,7 @@ public class MqLogicServiceImpl implements MqLogicService {
         Map map = new HashMap();
         map.put(SystemConstant.MQDTO_OBJ, JacksonUtil.parseJson(mqDto));
         tmRocketMessage.setProp(JacksonUtil.parseJson(map));
+        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
         redisUtil.delete(key, mqDto.getId());
     }

+ 2 - 0
themis-task/src/main/java/com/qmth/themis/task/listener/service/impl/MqTaskLogicServiceImpl.java

@@ -59,6 +59,7 @@ public class MqTaskLogicServiceImpl implements MqTaskLogicService {
         Gson gson = new Gson();
         mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
         redisUtil.delete(key, mqDto.getId());
     }
@@ -114,6 +115,7 @@ public class MqTaskLogicServiceImpl implements MqTaskLogicService {
             }
         }
         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
         redisUtil.delete(key, mqDto.getId());
     }

+ 2 - 1
themis-task/src/main/java/com/qmth/themis/task/quartz/service/impl/QuartzLogicServiceImpl.java

@@ -10,6 +10,7 @@ import com.qmth.themis.business.enums.MqEnum;
 import com.qmth.themis.business.service.TEExamActivityService;
 import com.qmth.themis.business.service.TEExamStudentService;
 import com.qmth.themis.business.service.TOeExamRecordService;
+import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.mq.dto.MqDto;
 import com.qmth.themis.mq.enums.MqTagEnum;
 import com.qmth.themis.mq.enums.MqTopicEnum;
@@ -96,7 +97,7 @@ public class QuartzLogicServiceImpl implements QuartzLogicService {
                 });
                 //加入踢下线mq
                 teExamStudentService.updateBatchById(teExamStudentList);
-                MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.oe.name(), finalExamStudentIdentityList, MqEnum.WEBSOCKET_OFFLINE_LOG, String.valueOf(teExamActivity.getId()), teExamActivity.getCode());
+                MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.oe.name(), JacksonUtil.parseJson(finalExamStudentIdentityList), MqEnum.WEBSOCKET_OFFLINE_LOG, String.valueOf(teExamActivity.getId()), teExamActivity.getCode());
                 //发送强行离线mq start
                 mqDtoService.assembleSendOneWayMsg(mqDto);
                 //发送强行离线mq end