소스 검색

考生端接口

wangliang 5 년 전
부모
커밋
8136b96c5f

+ 7 - 1
themis-business/src/main/java/com/qmth/themis/business/constant/SystemConstant.java

@@ -81,7 +81,9 @@ public class SystemConstant {
      * redis分布式锁
      */
     public static final String REDIS_LOCK_MQ_PREFIX = "lock:mq:";
+    public static final String REDIS_LOCK_WEBSOCKET_PREFIX = "lock:websocket:";
     public static final long REDIS_LOCK_MQ_TIME_OUT = 1L;
+    public static final long REDIS_LOCK_WEBSOCKET_TIME_OUT = 1L;
     public static final String REDIS_CACHE = "cache:task:";
     public static final long REDIS_CACHE_TIME_OUT = 30L;
     /**
@@ -112,7 +114,11 @@ public class SystemConstant {
     public static final int THREAD_POOL_MAX_POOL_SIZE = 40;
     public static final int THREAD_POOL_KEEP_ALIVE_SECONDS = 60;
     public static final int THREAD_POOL_QUEUE_CAPACITY = 100;
-
+    /**
+     * websocket
+     */
+    public static final String WEBSOCKET_OE_ONLINE_COUNT = "websocket:oe:online:count";
+    public static final String GET = "get";
 
     /**
      * 获取过期时间

+ 121 - 85
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketServer.java

@@ -2,24 +2,27 @@ package com.qmth.themis.exam.websocket;//package com.qmth.themis.backend.websock
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import com.qmth.themis.business.constant.SpringContextHolder;
+import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.entity.TBSession;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.enums.ExceptionResultEnum;
 import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.common.signature.SignatureInfo;
 import com.qmth.themis.common.signature.SignatureType;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.Resource;
 import javax.websocket.*;
 import javax.websocket.server.PathParam;
 import javax.websocket.server.ServerEndpoint;
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @Description: websocker服务端
@@ -28,76 +31,80 @@ import java.util.concurrent.ConcurrentHashMap;
  * @Author: wangliang
  * @Date: 2020/7/10
  */
-@ServerEndpoint("/oe/{platform}/{deviceId}/{Authorization}/{time}/{method}")
+@ServerEndpoint("/oe")
 @Component
 public class WebSocketServer
 //        implements MessageListenerConcurrently
 {
     private final static Logger log = LoggerFactory.getLogger(WebSocketServer.class);
-    /**
-     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
-     */
-    private static int onlineCount = 0;
     /**
      * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
      */
-    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
+    private volatile static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
     /**
      * 与某个客户端的连接会话,需要通过它来给客户端发送数据
      */
     private Session session;
-
+    private String sessionId;
     private String platform = null;
     private String deviceId = null;
     private String Authorization = null;
     private Long time = null;
-    private String method = null;
-
-    @Resource
-    RedisUtil redisUtil;
+    private RedisUtil redisUtil;
 
     /**
      * 连接建立成功调用的方法
      */
     @OnOpen
-    public void onOpen(Session session, @PathParam("platform") String platform, @PathParam("deviceId") String deviceId, @PathParam("Authorization") String Authorization, @PathParam("time") Long time, @PathParam("method") String method) {
-        this.session = session;
-        if (Objects.isNull(platform) || Objects.equals(platform, "")) {
+    public void onOpen(Session session) {
+        Map<String, List<String>> mapParameter = session.getRequestParameterMap();
+        if (Objects.isNull(mapParameter)) {
+            throw new BusinessException(ExceptionResultEnum.PARAMS_ILLEGALITY);
+        }
+        if (Objects.isNull(mapParameter.get("platform")) || mapParameter.get("platform").size() == 0) {
             throw new BusinessException(ExceptionResultEnum.PLATFORM_INVALID);
         }
-        if (Objects.isNull(deviceId) || Objects.equals(deviceId, "")) {
+        this.platform = String.valueOf(mapParameter.get("platform").get(0));
+        if (Objects.isNull(mapParameter.get("deviceId")) || mapParameter.get("deviceId").size() == 0) {
             throw new BusinessException(ExceptionResultEnum.DEVICE_ID_INVALID);
         }
-        this.platform = platform;
-        this.deviceId = deviceId;
-        this.Authorization = Authorization;
-        this.time = time;
-        this.method = method;
+        this.deviceId = String.valueOf(mapParameter.get("deviceId").get(0));
+        this.Authorization = String.valueOf(mapParameter.get("Authorization").get(0));
+        this.time = Long.parseLong(String.valueOf(mapParameter.get("time").get(0)));
+        String method = SystemConstant.GET;
         final SignatureInfo info = SignatureInfo
                 .parse(Authorization);
         if (Objects.nonNull(info) && info.getType() == SignatureType.TOKEN) {
             String sessionId = info.getInvoker();
+            redisUtil = SpringContextHolder.getBean(RedisUtil.class);
             TBSession tbSession = (TBSession) redisUtil.getUserSession(sessionId);
-            if (info.validate(tbSession.getAccessToken()) && info.getTimestamp() < tbSession.getExpireTime().getTime()
-                    && platform.equalsIgnoreCase(tbSession.getPlatform()) && Objects.equals(deviceId, tbSession.getDeviceId())) {
-//                if (webSocketMap.containsKey(userId)) {
-//                    webSocketMap.remove(userId);
-//                    webSocketMap.put(userId, this);
-//                    //加入set中
-//                } else {
-//                    webSocketMap.put(userId, this);
-//                    //加入set中
-//                    addOnlineCount();
-//                    //在线数加1
-//                }
-//                log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
-//                try {
-//                    sendMessage("连接成功");
-//                } catch (IOException e) {
-//                    log.error("用户:" + userId + ",网络异常!!!!!!");
-//                }
+            if (Objects.isNull(tbSession)) {
+                throw new BusinessException(ExceptionResultEnum.LOGIN_NO);
             } else {
-                throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
+                if (info.validate(tbSession.getAccessToken()) && info.getTimestamp() < tbSession.getExpireTime().getTime()
+                        && platform.equalsIgnoreCase(tbSession.getPlatform()) && Objects.equals(deviceId, tbSession.getDeviceId())) {
+                    this.session = session;
+                    this.sessionId = tbSession.getId();
+                    if (webSocketMap.containsKey(session)) {
+                        webSocketMap.remove(sessionId);
+                        webSocketMap.put(sessionId, this);
+                        //加入set中
+                    } else {
+                        webSocketMap.put(sessionId, this);
+                        //加入set中
+                        addOnlineCount();
+                        //在线数加1
+                    }
+                    log.info("用户连接:" + this.sessionId + ",当前在线人数为:" + getOnlineCount());
+                    try {
+                        sendMessage("连接成功");
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        log.error("用户:" + this.sessionId + ",网络异常!!!!!!");
+                    }
+                } else {
+                    throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
+                }
             }
         } else {
             throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
@@ -109,57 +116,62 @@ public class WebSocketServer
      */
     @OnClose
     public void onClose() {
-//        if (webSocketMap.containsKey(userId)) {
-//            webSocketMap.remove(userId);
-//            //从set中删除
-//            subOnlineCount();
-//        }
-//        log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
+        if (webSocketMap.containsKey(this.sessionId)) {
+            webSocketMap.remove(this.sessionId);
+            //从set中删除
+            subOnlineCount();
+        }
+        log.info("用户退出:" + this.sessionId + ",当前在线人数为:" + getOnlineCount());
     }
 
     /**
      * 收到客户端消息后调用的方法
      *
-     * @param message 客户端发送过来的消息
+     * @param message
+     * @param session
      */
     @OnMessage
     public void onMessage(String message, Session session) {
-//        log.info("用户消息:" + userId + ",报文:" + message);
-//        //可以群发消息
-//        //消息保存到数据库、redis
-//        if (StringUtils.isNotBlank(message)) {
-//            try {
-//                //解析发送的报文
-//                JSONObject jsonObject = JSON.parseObject(message);
-//                //追加发送人(防止串改)
-//                jsonObject.put("fromUserId", this.userId);
-//                String toUserId = jsonObject.getString("toUserId");
-//                //传送给对应toUserId用户的websocket
-//                if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) {
-//                    webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
-//                } else {
-//                    log.error("请求的userId:" + toUserId + "不在该服务器上");
-//                    //否则不在这个服务器上,发送到mysql或者redis
-//                }
-//            } catch (Exception e) {
-//                e.printStackTrace();
-//            }
-//        }
+        //可以群发消息
+        //消息保存到数据库、redis
+        if (Objects.nonNull(message)) {
+            try {
+                //解析发送的报文
+                JSONObject jsonObject = JSON.parseObject(message);
+                //追加发送人(防止串改)
+                jsonObject.put("fromSessionId", this.sessionId);
+                String toSessionId = jsonObject.getString("toSessionId");
+                //传送给对应toSessionId用户的websocket
+                if (Objects.nonNull(toSessionId) && webSocketMap.containsKey(toSessionId)) {
+                    webSocketMap.get(toSessionId).sendMessage(jsonObject.toJSONString());
+                } else {
+                    log.error("请求的sessionId:" + toSessionId + "不在该服务器上");
+                    //否则不在这个服务器上,发送到mysql或者redis
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
     }
 
     /**
+     * 错误
+     *
      * @param session
      * @param error
      */
     @OnError
     public void onError(Session session, Throwable error) {
-//        log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
+        log.error("用户错误:" + this.sessionId + ",原因:" + error.getMessage());
         error.printStackTrace();
         throw new BusinessException(error.getMessage());
     }
 
     /**
      * 实现服务器主动推送
+     *
+     * @param message
+     * @throws IOException
      */
     public void sendMessage(String message) throws IOException {
         this.session.getBasicRemote().sendText(message);
@@ -168,29 +180,53 @@ public class WebSocketServer
     /**
      * 发送自定义消息
      */
-    public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
-        log.info("发送消息到:" + userId + ",报文:" + message);
-        if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
-            webSocketMap.get(userId).sendMessage(message);
+    public static void sendInfo(String message, @PathParam("sessionId") String sessionId) throws IOException {
+        log.info("发送消息到:" + sessionId + ",报文:" + message);
+        if (Objects.nonNull(sessionId) && webSocketMap.containsKey(sessionId)) {
+            webSocketMap.get(sessionId).sendMessage(message);
         } else {
-            log.error("用户" + userId + ",不在线!");
+            log.error("用户" + sessionId + ",不在线!");
         }
     }
 
-    public void wxappPhotoReady() {
-        log.info("wxappPhotoReady is come in");
-    }
-
-    public static synchronized int getOnlineCount() {
-        return onlineCount;
+    /**
+     * 获取在线人数
+     *
+     * @return
+     */
+    public synchronized int getOnlineCount() {
+        Object o = redisUtil.getCacheValue(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, "");
+        return Objects.isNull(o) ? 0 : (int) o;
     }
 
-    public static synchronized void addOnlineCount() {
-        WebSocketServer.onlineCount++;
+    /**
+     * 在线人数加一
+     */
+    public synchronized void addOnlineCount() {
+        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX, this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT, TimeUnit.SECONDS)) {
+            Object o = redisUtil.getCacheValue(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, "");
+            int count = 0;
+            if (Objects.nonNull(o)) {
+                count = (int) o;
+            }
+            count++;
+            redisUtil.setCacheValue(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, "", count);
+        }
     }
 
-    public static synchronized void subOnlineCount() {
-        WebSocketServer.onlineCount--;
+    /**
+     * 在线人数减一
+     */
+    public synchronized void subOnlineCount() {
+        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX, this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT, TimeUnit.SECONDS)) {
+            Object o = redisUtil.getCacheValue(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, "");
+            int count = 0;
+            if (Objects.nonNull(o)) {
+                count = (int) o;
+            }
+            count--;
+            redisUtil.setCacheValue(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, "", count < 0 ? 0 : count);
+        }
     }
 
 //    @Override