|
@@ -56,14 +56,10 @@ public class WebSocketOeServer implements Concurrently {
|
|
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
|
|
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
|
|
*/
|
|
*/
|
|
private Session session = null;
|
|
private Session session = null;
|
|
- private String sessionId = null, ip = null;
|
|
|
|
- private Long recordId = null;
|
|
|
|
- private String platform = null, deviceId = null, authorization = null;
|
|
|
|
- private Long time = null;
|
|
|
|
|
|
+ private String sessionId = null, ip = null, platform = null, deviceId = null, authorization = null, url = "/ws/oe", websocketSessionId = null;
|
|
|
|
+ private Long recordId = null, time = null, updateTime = null;
|
|
private RedisUtil redisUtil;
|
|
private RedisUtil redisUtil;
|
|
- private Long updateTime = null;
|
|
|
|
private Map<String, Object> tranMap = null;
|
|
private Map<String, Object> tranMap = null;
|
|
- private String url = "/ws/oe";
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
* 连接建立成功调用的方法
|
|
* 连接建立成功调用的方法
|
|
@@ -94,20 +90,21 @@ public class WebSocketOeServer implements Concurrently {
|
|
this.session = session;
|
|
this.session = session;
|
|
session.setMaxIdleTimeout(SystemConstant.WEBSOCKET_MAX_TIME_OUT);
|
|
session.setMaxIdleTimeout(SystemConstant.WEBSOCKET_MAX_TIME_OUT);
|
|
this.sessionId = tbSession.getId();
|
|
this.sessionId = tbSession.getId();
|
|
- if (webSocketMap.containsKey(this.session.getId())) {
|
|
|
|
- webSocketMap.remove(this.session.getId());
|
|
|
|
- webSocketMap.put(this.session.getId(), this);
|
|
|
|
|
|
+ websocketSessionId = String.valueOf(UidUtil.nextId());
|
|
|
|
+ if (webSocketMap.containsKey(this.websocketSessionId)) {
|
|
|
|
+ webSocketMap.remove(this.websocketSessionId);
|
|
|
|
+ webSocketMap.put(this.websocketSessionId, this);
|
|
} else {
|
|
} else {
|
|
- webSocketMap.put(this.session.getId(), this);
|
|
|
|
|
|
+ webSocketMap.put(this.websocketSessionId, this);
|
|
addOnlineCount();
|
|
addOnlineCount();
|
|
}
|
|
}
|
|
- log.info("用户连接:{},当前在线人数为:{}", this.sessionId, getOnlineCount());
|
|
|
|
|
|
+ log.info("用户连接:{},当前在线人数为:{}", this.websocketSessionId, getOnlineCount());
|
|
InetSocketAddress addr = (InetSocketAddress) WebsocketUtil.getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
|
|
InetSocketAddress addr = (InetSocketAddress) WebsocketUtil.getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
|
|
this.ip = addr.toString().replace("/", "").split(":")[0];
|
|
this.ip = addr.toString().replace("/", "").split(":")[0];
|
|
log.info("ip[:{}]连接成功", this.ip);
|
|
log.info("ip[:{}]连接成功", this.ip);
|
|
- WebsocketUtil.updateExamRecordWebsocketStatus(recordId, ip, this.session.getId(), WebsocketStatusEnum.ON_LINE);
|
|
|
|
|
|
+ WebsocketUtil.updateExamRecordWebsocketStatus(this.recordId, this.ip, this.websocketSessionId, WebsocketStatusEnum.ON_LINE);
|
|
this.updateTime = System.currentTimeMillis();
|
|
this.updateTime = System.currentTimeMillis();
|
|
- tranMap = WebsocketUtil.initWebsocket(recordId, null, deviceId, ip, updateTime);
|
|
|
|
|
|
+ tranMap = WebsocketUtil.initWebsocket(this.recordId, null, this.deviceId, this.ip, this.updateTime);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -116,12 +113,12 @@ public class WebSocketOeServer implements Concurrently {
|
|
@OnClose
|
|
@OnClose
|
|
public void onClose() {
|
|
public void onClose() {
|
|
log.info("onClose is come in");
|
|
log.info("onClose is come in");
|
|
- if (webSocketMap.containsKey(this.session.getId())) {
|
|
|
|
- webSocketMap.remove(this.session.getId());
|
|
|
|
|
|
+ if (webSocketMap.containsKey(this.websocketSessionId)) {
|
|
|
|
+ webSocketMap.remove(this.websocketSessionId);
|
|
//从set中删除
|
|
//从set中删除
|
|
subOnlineCount();
|
|
subOnlineCount();
|
|
//判断是否是正常退出
|
|
//判断是否是正常退出
|
|
- ExamRecordCacheUtil.setClientWebsocketStatus(recordId, WebsocketStatusEnum.OFF_LINE, true);
|
|
|
|
|
|
+ ExamRecordCacheUtil.setClientWebsocketStatus(this.recordId, WebsocketStatusEnum.OFF_LINE, true);
|
|
ExamRecordStatusEnum status = ExamRecordCacheUtil.getStatus(this.recordId);
|
|
ExamRecordStatusEnum status = ExamRecordCacheUtil.getStatus(this.recordId);
|
|
if (Objects.equals(status, ExamRecordStatusEnum.ANSWERING)) {
|
|
if (Objects.equals(status, ExamRecordStatusEnum.ANSWERING)) {
|
|
//大于等于超时时间,说明规定时间内都没有通信,非正常退出,因为期间会有心跳更新updateTime
|
|
//大于等于超时时间,说明规定时间内都没有通信,非正常退出,因为期间会有心跳更新updateTime
|
|
@@ -131,7 +128,7 @@ public class WebSocketOeServer implements Concurrently {
|
|
MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
|
|
MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
|
|
tranMap = mqDtoService.buildMqDelayMsg("2m");
|
|
tranMap = mqDtoService.buildMqDelayMsg("2m");
|
|
MqUtil mqUtil = SpringContextHolder.getBean(MqUtil.class);
|
|
MqUtil mqUtil = SpringContextHolder.getBean(MqUtil.class);
|
|
- MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_UN_NORMAL.name(), MqTagEnum.OE_UN_NORMAL, MqTagEnum.OE_UN_NORMAL, String.valueOf(this.recordId), this.tranMap, this.sessionId);
|
|
|
|
|
|
+ MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_UN_NORMAL.name(), MqTagEnum.OE_UN_NORMAL, MqTagEnum.OE_UN_NORMAL, String.valueOf(this.recordId), this.tranMap, this.websocketSessionId);
|
|
mqDtoService.assembleSendAsyncDelayMsg(mqDto);
|
|
mqDtoService.assembleSendAsyncDelayMsg(mqDto);
|
|
//发送延时mq消息end
|
|
//发送延时mq消息end
|
|
} else {
|
|
} else {
|
|
@@ -142,7 +139,7 @@ public class WebSocketOeServer implements Concurrently {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- log.info("用户退出:{},当前在线人数为:{},updateTime:{}", this.sessionId, getOnlineCount(), this.updateTime);
|
|
|
|
|
|
+ log.info("用户退出:{},当前在线人数为:{},updateTime:{}", this.websocketSessionId, getOnlineCount(), this.updateTime);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -187,7 +184,7 @@ public class WebSocketOeServer implements Concurrently {
|
|
*/
|
|
*/
|
|
@OnError
|
|
@OnError
|
|
public void onError(Session session, Throwable error) {
|
|
public void onError(Session session, Throwable error) {
|
|
- log.error("用户错误:{},原因:{}", this.sessionId, error);
|
|
|
|
|
|
+ log.error("用户错误:{},原因:{}", this.websocketSessionId, error);
|
|
throw new BusinessException(error.getMessage());
|
|
throw new BusinessException(error.getMessage());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -217,7 +214,7 @@ public class WebSocketOeServer implements Concurrently {
|
|
* 在线人数加一
|
|
* 在线人数加一
|
|
*/
|
|
*/
|
|
public synchronized void addOnlineCount() {
|
|
public synchronized void addOnlineCount() {
|
|
- if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
|
|
|
|
+ if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
try {
|
|
try {
|
|
Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
|
|
Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
|
|
int count = 0;
|
|
int count = 0;
|
|
@@ -227,8 +224,8 @@ public class WebSocketOeServer implements Concurrently {
|
|
count++;
|
|
count++;
|
|
redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count);
|
|
redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count);
|
|
} finally {
|
|
} finally {
|
|
- if (Objects.nonNull(this.sessionId)) {
|
|
|
|
- redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId);
|
|
|
|
|
|
+ if (Objects.nonNull(this.websocketSessionId)) {
|
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -238,7 +235,7 @@ public class WebSocketOeServer implements Concurrently {
|
|
* 在线人数减一
|
|
* 在线人数减一
|
|
*/
|
|
*/
|
|
public synchronized void subOnlineCount() {
|
|
public synchronized void subOnlineCount() {
|
|
- if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
|
|
|
|
+ if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
try {
|
|
try {
|
|
Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
|
|
Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
|
|
int count = 0;
|
|
int count = 0;
|
|
@@ -248,8 +245,8 @@ public class WebSocketOeServer implements Concurrently {
|
|
count--;
|
|
count--;
|
|
redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count < 0 ? 0 : count);
|
|
redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count < 0 ? 0 : count);
|
|
} finally {
|
|
} finally {
|
|
- if (Objects.nonNull(this.sessionId)) {
|
|
|
|
- redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId);
|
|
|
|
|
|
+ if (Objects.nonNull(this.websocketSessionId)) {
|
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|