|
@@ -13,10 +13,9 @@ import com.qmth.themis.business.service.MqDtoService;
|
|
|
import com.qmth.themis.business.service.TOeExamRecordService;
|
|
|
import com.qmth.themis.business.util.*;
|
|
|
import com.qmth.themis.common.contanst.Constants;
|
|
|
-import com.qmth.themis.common.enums.ExceptionResultEnum;
|
|
|
-import com.qmth.themis.common.enums.Platform;
|
|
|
import com.qmth.themis.common.exception.BusinessException;
|
|
|
import com.qmth.themis.exam.listener.service.MqOeLogicService;
|
|
|
+import com.qmth.themis.exam.websocket.interceptor.WebSocketMobileHandshakeInterceptor;
|
|
|
import com.qmth.themis.exam.websocketTemplete.WebSocketMobileMessageTemplete;
|
|
|
import com.qmth.themis.mq.templete.Concurrently;
|
|
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
@@ -44,7 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
* @Author: wangliang
|
|
|
* @Date: 2020/7/10
|
|
|
*/
|
|
|
-@ServerEndpoint("/ws/mobile")
|
|
|
+@ServerEndpoint(value = "/ws/mobile", configurator = WebSocketMobileHandshakeInterceptor.class)
|
|
|
@Component
|
|
|
public class WebSocketMobileServer implements Concurrently {
|
|
|
private final static Logger log = LoggerFactory.getLogger(WebSocketMobileServer.class);
|
|
@@ -53,9 +52,8 @@ public class WebSocketMobileServer implements Concurrently {
|
|
|
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
|
|
|
*/
|
|
|
private Session session = null;
|
|
|
- private Platform platform = null;
|
|
|
- private String sessionId = null, ip = null, deviceId = null, authorization = null, url = "/ws/mobile", websocketSessionId = null;
|
|
|
- private Long time = null, recordId = null, updateTime = null;
|
|
|
+ private String sessionId = null, ip = null, deviceId = null, websocketSessionId = null;
|
|
|
+ private Long recordId = null, updateTime = null;
|
|
|
private RedisUtil redisUtil;
|
|
|
private Map<String, Object> tranMap = null;
|
|
|
private MonitorVideoSourceEnum source = null;
|
|
@@ -65,28 +63,11 @@ public class WebSocketMobileServer implements Concurrently {
|
|
|
*/
|
|
|
@OnOpen
|
|
|
public void onOpen(Session session) {
|
|
|
- Map<String, List<String>> mapParameter = session.getRequestParameterMap();
|
|
|
- if (Objects.isNull(mapParameter)) {
|
|
|
- throw new BusinessException(ExceptionResultEnum.PARAMS_ILLEGALITY);
|
|
|
- }
|
|
|
- log.info("mapParameter:{}", JacksonUtil.parseJson(mapParameter));
|
|
|
- log.info("uri:{}", session.getRequestURI());
|
|
|
- if (Objects.isNull(mapParameter.get(Constants.HEADER_PLATFORM)) || mapParameter.get(Constants.HEADER_PLATFORM).size() == 0) {
|
|
|
- throw new BusinessException(ExceptionResultEnum.PLATFORM_INVALID);
|
|
|
- }
|
|
|
- this.platform = Platform.valueOf(mapParameter.get(Constants.HEADER_PLATFORM).get(0));
|
|
|
- if (Objects.isNull(mapParameter.get(Constants.HEADER_DEVICE_ID)) || mapParameter.get(Constants.HEADER_DEVICE_ID).size() == 0) {
|
|
|
- throw new BusinessException(ExceptionResultEnum.DEVICE_ID_INVALID);
|
|
|
- }
|
|
|
- this.deviceId = mapParameter.get(Constants.HEADER_DEVICE_ID).get(0);
|
|
|
- this.authorization = mapParameter.get(Constants.HEADER_AUTHORIZATION).get(0);
|
|
|
- this.time = Long.parseLong(mapParameter.get(Constants.HEADER_TIME).get(0));
|
|
|
- this.recordId = Long.parseLong(mapParameter.get(Constants.HEADER_RECORD_ID).get(0));
|
|
|
- this.source = MonitorVideoSourceEnum.valueOf(mapParameter.get(Constants.HEADER_SOURCE).get(0));
|
|
|
-
|
|
|
this.redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
- TBSession tbSession = AuthUtil.websocketAuthInterceptor(this.platform, this.deviceId, this.authorization, this.time, SystemConstant.GET, this.url);
|
|
|
- SystemConstant.checkExamStatus(this.recordId);
|
|
|
+ this.recordId = (Long) session.getUserProperties().get(Constants.HEADER_RECORD_ID);
|
|
|
+ this.deviceId = (String) session.getUserProperties().get(Constants.HEADER_DEVICE_ID);
|
|
|
+ TBSession tbSession = (TBSession) session.getUserProperties().get(Constants.HEADER_TB_SESSION);
|
|
|
+ this.source = (MonitorVideoSourceEnum) session.getUserProperties().get(Constants.HEADER_SOURCE);
|
|
|
this.session = session;
|
|
|
session.setMaxIdleTimeout(SystemConstant.WEBSOCKET_MAX_TIME_OUT);
|
|
|
this.sessionId = tbSession.getId();
|
|
@@ -96,10 +77,9 @@ public class WebSocketMobileServer implements Concurrently {
|
|
|
webSocketMap.put(this.websocketSessionId + "-" + this.source.name(), this);
|
|
|
} else {
|
|
|
webSocketMap.put(this.websocketSessionId + "-" + this.source.name(), this);
|
|
|
-// addOnlineCount();
|
|
|
+ addOnlineCount();
|
|
|
}
|
|
|
-// log.info("用户连接:{},当前在线人数为:{}", this.sessionId, getOnlineCount());
|
|
|
- log.info("用户连接:{}", this.websocketSessionId);
|
|
|
+ log.info("用户连接:{},当前在线人数为:{}", this.websocketSessionId, getOnlineCount());
|
|
|
InetSocketAddress addr = (InetSocketAddress) WebsocketUtil.getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
|
|
|
this.ip = addr.toString().replace("/", "").split(":")[0];
|
|
|
log.info("ip[:{}]连接成功", this.ip);
|
|
@@ -121,7 +101,7 @@ public class WebSocketMobileServer implements Concurrently {
|
|
|
if (webSocketMap.containsKey(this.websocketSessionId + "-" + this.source.name())) {
|
|
|
webSocketMap.remove(this.websocketSessionId + "-" + this.source.name());
|
|
|
//从set中删除
|
|
|
-// subOnlineCount();
|
|
|
+ subOnlineCount();
|
|
|
//判断是否是正常退出
|
|
|
Long timestamp = System.currentTimeMillis();
|
|
|
ExamRecordCacheUtil.setMonitorStatus(this.recordId, this.source, MonitorStatusSourceEnum.STOP, timestamp);
|
|
@@ -141,8 +121,7 @@ public class WebSocketMobileServer implements Concurrently {
|
|
|
MqDto mqDtoStop = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.MONITOR_STOP.name(), recordId, MqTagEnum.MONITOR_STOP, String.valueOf(recordId), mqMap, String.valueOf(recordId));
|
|
|
mqDtoService.assembleSendOneOrderMsg(mqDtoStop);
|
|
|
}
|
|
|
-// log.info("用户退出:{},当前在线人数为:{},updateTime:{}", this.sessionId, getOnlineCount(), this.updateTime);
|
|
|
- log.info("用户退出:{},updateTime:{}", this.websocketSessionId, this.updateTime);
|
|
|
+ log.info("用户退出:{},当前在线人数为:{},updateTime:{}", this.sessionId, getOnlineCount(), this.updateTime);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -200,50 +179,80 @@ public class WebSocketMobileServer implements Concurrently {
|
|
|
this.updateTime = System.currentTimeMillis();
|
|
|
}
|
|
|
|
|
|
-// /**
|
|
|
-// * 获取在线人数
|
|
|
-// *
|
|
|
-// * @return
|
|
|
-// */
|
|
|
-// public synchronized int getOnlineCount() {
|
|
|
-// Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
|
|
|
-// return Objects.isNull(o) ? 0 : (int) o;
|
|
|
-// }
|
|
|
-//
|
|
|
-// /**
|
|
|
-// * 在线人数加一
|
|
|
-// */
|
|
|
-// public synchronized void addOnlineCount() {
|
|
|
-// if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
|
-// Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
|
|
|
-// int count = 0;
|
|
|
-// if (Objects.nonNull(o)) {
|
|
|
-// count = (int) o;
|
|
|
-// }
|
|
|
-// count++;
|
|
|
-// redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count);
|
|
|
-// }
|
|
|
-// }
|
|
|
-//
|
|
|
-// /**
|
|
|
-// * 在线人数减一
|
|
|
-// */
|
|
|
-// public synchronized void subOnlineCount() {
|
|
|
-// if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
|
-// Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
|
|
|
-// int count = 0;
|
|
|
-// if (Objects.nonNull(o)) {
|
|
|
-// count = (int) o;
|
|
|
-// }
|
|
|
-// count--;
|
|
|
-// redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count < 0 ? 0 : count);
|
|
|
-// }
|
|
|
-// }
|
|
|
+ /**
|
|
|
+ * 获取在线人数
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public synchronized int getOnlineCount() {
|
|
|
+ Object o = redisUtil.get(SystemConstant.WEBSOCKET_MOBILE_ONLINE_COUNT);
|
|
|
+ return Objects.isNull(o) ? 0 : (int) o;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 在线人数加一
|
|
|
+ */
|
|
|
+ public synchronized void addOnlineCount() {
|
|
|
+ if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
|
+ try {
|
|
|
+ Object o = redisUtil.get(SystemConstant.WEBSOCKET_MOBILE_ONLINE_COUNT);
|
|
|
+ int count = 0;
|
|
|
+ if (Objects.nonNull(o)) {
|
|
|
+ count = (int) o;
|
|
|
+ }
|
|
|
+ count++;
|
|
|
+ redisUtil.set(SystemConstant.WEBSOCKET_MOBILE_ONLINE_COUNT, count);
|
|
|
+ } finally {
|
|
|
+ if (Objects.nonNull(this.websocketSessionId)) {
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 在线人数减一
|
|
|
+ */
|
|
|
+ public synchronized void subOnlineCount() {
|
|
|
+ if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
|
+ try {
|
|
|
+ Object o = redisUtil.get(SystemConstant.WEBSOCKET_MOBILE_ONLINE_COUNT);
|
|
|
+ int count = 0;
|
|
|
+ if (Objects.nonNull(o)) {
|
|
|
+ count = (int) o;
|
|
|
+ }
|
|
|
+ count--;
|
|
|
+ redisUtil.set(SystemConstant.WEBSOCKET_MOBILE_ONLINE_COUNT, count < 0 ? 0 : count);
|
|
|
+ } finally {
|
|
|
+ if (Objects.nonNull(this.websocketSessionId)) {
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
public static ConcurrentHashMap<String, WebSocketMobileServer> getWebSocketMap() {
|
|
|
return webSocketMap;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 关闭session
|
|
|
+ *
|
|
|
+ * @param webSocketMobileServer
|
|
|
+ */
|
|
|
+ public static void close(WebSocketMobileServer webSocketMobileServer) {
|
|
|
+ //判断当前连接是否还在线
|
|
|
+ if (Objects.nonNull(webSocketMobileServer) && webSocketMobileServer.session.isOpen()) {
|
|
|
+ try {
|
|
|
+ // 关闭连接
|
|
|
+ CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "鉴权失败!");
|
|
|
+ webSocketMobileServer.session.close(closeReason);
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public Long getRecordId() {
|
|
|
return recordId;
|
|
|
}
|