Browse Source

考生端websocket

wangliang 4 years ago
parent
commit
dcdd84e9f9

+ 4 - 0
themis-backend/pom.xml

@@ -30,6 +30,10 @@
 <!--            <groupId>org.springframework.boot</groupId>-->
 <!--            <groupId>org.springframework.boot</groupId>-->
 <!--            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>-->
 <!--            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>-->
 <!--        </dependency>-->
 <!--        </dependency>-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
     </dependencies>
     </dependencies>
 
 
     <build>
     <build>

+ 41 - 0
themis-backend/src/main/java/com/qmth/themis/backend/config/WebSocketConfig.java

@@ -0,0 +1,41 @@
+package com.qmth.themis.backend.config;//package com.qmth.themis.backend.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+/**
+ * @Description: 开启WebSocket支持
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/6/3
+ */
+@Configuration
+@EnableWebSocket
+public class WebSocketConfig
+//        implements WebSocketConfigurer
+{
+    /**
+     * ServerEndpointExporter 作用
+     * 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
+     *
+     * @return
+     */
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter() {
+        return new ServerEndpointExporter();
+    }
+
+//    @Override
+//    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+//        registry.addHandler(myHandler(), "/messageHandler").setAllowedOrigins("*");
+//    }
+//
+//    @Bean
+//    public WebSocketHandler myHandler() {
+//        return new MessageHandler();
+//    }
+}
+

+ 325 - 0
themis-backend/src/main/java/com/qmth/themis/backend/websocket/WebSocketServer.java

@@ -0,0 +1,325 @@
+package com.qmth.themis.backend.websocket;
+
+import com.alibaba.fastjson.JSONObject;
+import com.qmth.themis.backend.config.DictionaryConfig;
+import com.qmth.themis.backend.websocketTemplete.WebSocketOeMessageTemplete;
+import com.qmth.themis.business.constant.SpringContextHolder;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.entity.TBSession;
+import com.qmth.themis.business.enums.MqEnum;
+import com.qmth.themis.business.enums.SystemOperationEnum;
+import com.qmth.themis.business.enums.WebsocketTypeEnum;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.business.util.WebsocketUtil;
+import com.qmth.themis.common.enums.ExceptionResultEnum;
+import com.qmth.themis.common.exception.BusinessException;
+import com.qmth.themis.common.signature.SignatureInfo;
+import com.qmth.themis.common.signature.SignatureType;
+import com.qmth.themis.common.util.Result;
+import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.service.MqDtoService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @Description: websocker管理端
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/10
+ */
+@ServerEndpoint("/admin")
+@Component
+public class WebSocketServer
+//        implements MessageListenerConcurrently
+{
+    private final static Logger log = LoggerFactory.getLogger(WebSocketServer.class);
+    private volatile static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
+    /**
+     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
+     */
+    private Session session = null;
+    private String sessionId = null, ip = null;
+    private String platform = null, deviceId = null, Authorization = null;
+    private Long time = null;
+    private RedisUtil redisUtil;
+    private Long updateTime = null;
+    private Map<String, Object> tranMap = null;
+
+    /**
+     * 连接建立成功调用的方法
+     */
+    @OnOpen
+    public void onOpen(Session session) {
+        Map<String, List<String>> mapParameter = session.getRequestParameterMap();
+        if (Objects.isNull(mapParameter)) {
+            throw new BusinessException(ExceptionResultEnum.PARAMS_ILLEGALITY);
+        }
+        log.info("mapParameter:{}", JacksonUtil.parseJson(mapParameter));
+        log.info("uri:{}", session.getRequestURI());
+        if (Objects.isNull(mapParameter.get("platform")) || mapParameter.get("platform").size() == 0) {
+            throw new BusinessException(ExceptionResultEnum.PLATFORM_INVALID);
+        }
+        this.platform = String.valueOf(mapParameter.get("platform").get(0));
+        if (Objects.isNull(mapParameter.get("deviceId")) || mapParameter.get("deviceId").size() == 0) {
+            throw new BusinessException(ExceptionResultEnum.DEVICE_ID_INVALID);
+        }
+        this.deviceId = String.valueOf(mapParameter.get("deviceId").get(0));
+        this.Authorization = String.valueOf(mapParameter.get("Authorization").get(0));
+        this.time = Long.parseLong(String.valueOf(mapParameter.get("time").get(0)));
+        String method = SystemConstant.GET;
+        final SignatureInfo info = SignatureInfo
+                .parse(Authorization);
+        if (Objects.nonNull(info) && info.getType() == SignatureType.TOKEN) {
+            String sessionId = info.getInvoker();
+            redisUtil = SpringContextHolder.getBean(RedisUtil.class);
+            TBSession tbSession = (TBSession) redisUtil.getUserSession(sessionId);
+            if (Objects.isNull(tbSession)) {
+                throw new BusinessException(ExceptionResultEnum.LOGIN_NO);
+            } else {
+                if (info.validate(tbSession.getAccessToken()) && info.getTimestamp() < tbSession.getExpireTime().getTime()
+                        && platform.equalsIgnoreCase(tbSession.getPlatform()) && Objects.equals(deviceId, tbSession.getDeviceId())) {
+                    this.session = session;
+                    session.setMaxIdleTimeout(SystemConstant.WEBSOCKET_MAX_TIME_OUT);
+                    this.sessionId = tbSession.getId();
+                    if (webSocketMap.containsKey(tbSession.getId())) {
+                        webSocketMap.remove(sessionId);
+                        webSocketMap.put(sessionId, this);
+                        //加入set中
+                    } else {
+                        webSocketMap.put(sessionId, this);
+                        //加入set中
+                        addOnlineCount();
+                        //在线数加1
+                    }
+                    redisUtil.delete(SystemConstant.WEBSOCKET_UN_NORMAL_LIST, this.sessionId);
+                    //发送恢复网络mq消息
+                    log.info("用户连接:" + this.sessionId + ",当前在线人数为:" + getOnlineCount());
+                    try {
+                        InetSocketAddress addr = (InetSocketAddress) WebsocketUtil.getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
+                        this.ip = addr.toString().replace("/", "").split(":")[0];
+                        this.sendMessage("ip[" + this.ip + "]连接成功");
+                        tranMap = new HashMap<>();
+                        tranMap.put("deviceId", this.deviceId);
+                        tranMap.put("ip", this.ip);
+                        tranMap.put("updateTime", this.updateTime);
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        log.error("用户:" + this.sessionId + ",网络异常!!!!!!");
+                    }
+                } else {
+                    throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
+                }
+            }
+        } else {
+            throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
+        }
+    }
+
+    /**
+     * 连接关闭调用的方法
+     */
+    @OnClose
+    public void onClose() {
+        log.info("onClose is come in");
+        if (webSocketMap.containsKey(this.sessionId)) {
+            webSocketMap.remove(this.sessionId);
+            //从set中删除
+            subOnlineCount();
+            //判断是否是正常退出
+            Date now = new Date();
+            //大于等于超时时间,说明规定时间内都没有通信,非正常退出,因为期间会有心跳更新updateTime
+            if ((now.getTime() - this.updateTime) / 1000 >= SystemConstant.WEBSOCKET_MAX_TIME_OUT / 1000) {
+                log.info("超时退出");
+                redisUtil.set(SystemConstant.WEBSOCKET_UN_NORMAL_LIST, this.sessionId, this.sessionId);
+                //发送延时mq消息start
+                MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
+                DictionaryConfig dictionaryConfig = SpringContextHolder.getBean(DictionaryConfig.class);
+                String level = "2m";
+                Integer time = SystemConstant.mqDelayLevel.get(level);
+                LocalDateTime dt = LocalDateTime.now();
+                dt = dt.plusMinutes(Long.parseLong(level.replace("m", "")));
+                tranMap.put("timeOut", time);
+                tranMap.put("mqExecTime", dt.toInstant(ZoneOffset.of("+8")).toEpochMilli());
+                MqDto mqDto = new MqDto(dictionaryConfig.mqConfigDomain().getWebsocketUnNormalTopic(), dictionaryConfig.mqConfigDomain().getWebsocketUnNormalTopicOeTag(), SystemOperationEnum.OE_NET_UN_NORMAL, MqEnum.WEBSOCKET_UN_NORMAL_LOG, this.sessionId, this.tranMap, this.sessionId);
+                mqDtoService.assembleSendAsyncDelayMsg(mqDto);
+                //发送延时mq消息end
+            }
+        }
+        log.info("用户退出:{},当前在线人数为:{},updateTime:{}", this.sessionId, getOnlineCount(), this.updateTime);
+    }
+
+    /**
+     * 收到客户端消息后调用的方法
+     *
+     * @param message
+     * @param session
+     */
+    @OnMessage
+    public void onMessage(String message, Session session) {
+        //可以群发消息
+        //消息保存到数据库、redis
+        if (Objects.nonNull(message)) {
+            try {
+                //解析发送的报文
+                JSONObject jsonObject = JSONObject.parseObject(message);
+                log.info("onMessage:{}", jsonObject.toJSONString());
+                if (Objects.nonNull(jsonObject)) {
+                    WebSocketOeMessageTemplete webSocketOeMessageTemplete = SpringContextHolder.getBean(WebSocketOeMessageTemplete.class);
+                    String type = String.valueOf(jsonObject.get("type"));
+                    Long time = Long.parseLong(String.valueOf(jsonObject.get("time")));
+                    //todo 加入当前时间和time比较的校验
+                    Method method = webSocketOeMessageTemplete.getClass().getDeclaredMethod(WebsocketTypeEnum.valueOf(type).getDesc(), String.class);
+                    Result result = (Result) method.invoke(webSocketOeMessageTemplete, String.valueOf(jsonObject.get("body")));
+                    this.sendMessage(JSONObject.toJSONString(result));
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * 错误
+     *
+     * @param session
+     * @param error
+     */
+    @OnError
+    public void onError(Session session, Throwable error) {
+        log.error("用户错误:{},原因:{}", this.sessionId, error.getMessage());
+        error.printStackTrace();
+        throw new BusinessException(error.getMessage());
+    }
+
+    /**
+     * 实现服务器主动推送
+     *
+     * @param message
+     * @throws IOException
+     */
+    public void sendMessage(String message) throws IOException {
+        log.info("message:{}", message);
+        this.session.getAsyncRemote().sendText(message);
+        this.updateTime = System.currentTimeMillis();
+    }
+
+    /**
+     * 发送自定义消息
+     */
+    public static void sendInfo(String message, @PathParam("sessionId") String sessionId) throws IOException {
+        log.info("发送消息到:{},报文:{}", sessionId, message);
+        if (Objects.nonNull(sessionId) && webSocketMap.containsKey(sessionId)) {
+            webSocketMap.get(sessionId).sendMessage(message);
+        } else {
+            log.error("用户[:{}]不在线!", sessionId);
+        }
+    }
+
+    /**
+     * 获取在线人数
+     *
+     * @return
+     */
+    public synchronized int getOnlineCount() {
+        Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
+        return Objects.isNull(o) ? 0 : (int) o;
+    }
+
+    /**
+     * 在线人数加一
+     */
+    public synchronized void addOnlineCount() {
+        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
+            Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
+            int count = 0;
+            if (Objects.nonNull(o)) {
+                count = (int) o;
+            }
+            count++;
+            redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count);
+        }
+    }
+
+    /**
+     * 在线人数减一
+     */
+    public synchronized void subOnlineCount() {
+        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
+            Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
+            int count = 0;
+            if (Objects.nonNull(o)) {
+                count = (int) o;
+            }
+            count--;
+            redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count < 0 ? 0 : count);
+        }
+    }
+
+//    @Override
+//    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+//        try {
+//            long threadId = Thread.currentThread().getId();
+//            String threadName = Thread.currentThread().getName();
+//            for (MessageExt messageExt : msgs) {
+//                log.info(":{}-:{} websocketConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
+//                MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
+//                log.info(":{}-:{} websocketConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
+//                log.info(":{}-:{} websocketConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
+//                Map map = mqDto.getProperties();
+//                String body = JacksonUtil.parseJson(mqDto.getBody());
+//                log.info("map:{},body:{}", JacksonUtil.parseJson(map), body);
+//                String model = String.valueOf(map.get("model"));
+//                MessageModel messageModel = MessageModel.valueOf(model);
+//                if (messageModel.ordinal() == MessageModel.CLUSTERING.ordinal()) {
+//                    webSocketMap.get(map.get("toUserId")).sendMessage(body);
+//                } else {
+//                    webSocketMap.forEach((k, v) -> {
+//                        try {
+//                            v.sendMessage(body);
+//                        } catch (IOException e) {
+//                            e.printStackTrace();
+//                        }
+//                    });
+//                }
+//            }
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+//        }
+//        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+//    }
+//
+//    @Service
+//    @RocketMQMessageListener(consumerGroup = "websocketConsumerImGroup", topic = "websocketImTopic", selectorType = SelectorType.TAG, selectorExpression = "*")
+//    public class sessionConsumerWeb implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
+//
+//        @Override
+//        public void onMessage(Message message) {
+//            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
+//        }
+//
+//        @Override
+//        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
+//            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
+//            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+//            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
+////            defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
+//            defaultMQPushConsumer.registerMessageListener(WebSocketServer.this::consumeMessage);
+//        }
+//    }
+}
+

+ 56 - 0
themis-backend/src/main/java/com/qmth/themis/backend/websocketTemplete/WebSocketOeMessageTemplete.java

@@ -0,0 +1,56 @@
+package com.qmth.themis.backend.websocketTemplete;
+
+import com.alibaba.fastjson.JSONObject;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.entity.TOeExamRecord;
+import com.qmth.themis.business.service.TOeExamRecordService;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.common.enums.ExceptionResultEnum;
+import com.qmth.themis.common.exception.BusinessException;
+import com.qmth.themis.common.util.Result;
+import com.qmth.themis.common.util.ResultUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.Resource;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * @Description: 管理端websocket模版
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/24
+ */
+@Component
+public class WebSocketOeMessageTemplete {
+    private final static Logger log = LoggerFactory.getLogger(WebSocketOeMessageTemplete.class);
+
+    /**
+     * 状态同步
+     *
+     * @param body
+     * @return
+     */
+    @Transactional
+    public Result syncStatus(String body) {
+        JSONObject jsonObject = JSONObject.parseObject(body);
+        log.info("syncStatus jsonObject:{}", jsonObject.toJSONString());
+        return this.syncAck();
+    }
+
+    /**
+     * 同步确认
+     *
+     * @return
+     */
+    public Result syncAck() {
+        Map map = new HashMap<>();
+        map.put(SystemConstant.ACK_MESSAGE, System.currentTimeMillis());
+        return ResultUtil.ok(map);
+    }
+}

+ 2 - 2
themis-exam/src/main/java/com/qmth/themis/exam/enums/WebsocketTypeEnum.java → themis-business/src/main/java/com/qmth/themis/business/enums/WebsocketTypeEnum.java

@@ -1,4 +1,4 @@
-package com.qmth.themis.exam.enums;
+package com.qmth.themis.business.enums;
 
 
 import java.util.Objects;
 import java.util.Objects;
 
 
@@ -28,7 +28,7 @@ public enum WebsocketTypeEnum {
     private String code;
     private String code;
     private String desc;
     private String desc;
 
 
-    private WebsocketTypeEnum(String code,String desc) {
+    private WebsocketTypeEnum(String code, String desc) {
         this.code = code;
         this.code = code;
         this.desc = desc;
         this.desc = desc;
     }
     }

+ 3 - 3
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketServer.java

@@ -1,4 +1,4 @@
-package com.qmth.themis.exam.websocket;//package com.qmth.themis.backend.websocket;
+package com.qmth.themis.exam.websocket;
 
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.JSONObject;
 import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SpringContextHolder;
@@ -6,6 +6,7 @@ import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.entity.TBSession;
 import com.qmth.themis.business.entity.TBSession;
 import com.qmth.themis.business.enums.MqEnum;
 import com.qmth.themis.business.enums.MqEnum;
 import com.qmth.themis.business.enums.SystemOperationEnum;
 import com.qmth.themis.business.enums.SystemOperationEnum;
+import com.qmth.themis.business.enums.WebsocketTypeEnum;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.business.util.WebsocketUtil;
 import com.qmth.themis.business.util.WebsocketUtil;
@@ -15,7 +16,6 @@ import com.qmth.themis.common.signature.SignatureInfo;
 import com.qmth.themis.common.signature.SignatureType;
 import com.qmth.themis.common.signature.SignatureType;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.exam.config.DictionaryConfig;
 import com.qmth.themis.exam.config.DictionaryConfig;
-import com.qmth.themis.exam.enums.WebsocketTypeEnum;
 import com.qmth.themis.exam.websocketTemplete.WebSocketOeMessageTemplete;
 import com.qmth.themis.exam.websocketTemplete.WebSocketOeMessageTemplete;
 import com.qmth.themis.mq.dto.MqDto;
 import com.qmth.themis.mq.dto.MqDto;
 import com.qmth.themis.mq.service.MqDtoService;
 import com.qmth.themis.mq.service.MqDtoService;
@@ -35,7 +35,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 
 /**
 /**
- * @Description: websocker服务
+ * @Description: websocker考生
  * @Param:
  * @Param:
  * @return:
  * @return:
  * @Author: wangliang
  * @Author: wangliang