|
@@ -23,6 +23,7 @@ import com.qmth.themis.exam.config.ExamConstant;
|
|
|
import com.qmth.themis.exam.listener.service.MqOeLogicService;
|
|
|
import com.qmth.themis.exam.websocketTemplete.WebSocketOeMessageTemplete;
|
|
|
import com.qmth.themis.mq.templete.Concurrently;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
@@ -50,21 +51,51 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
@ServerEndpoint("/ws/oe")
|
|
|
@Component
|
|
|
public class WebSocketOeServer implements Concurrently {
|
|
|
+
|
|
|
private final static Logger log = LoggerFactory.getLogger(WebSocketOeServer.class);
|
|
|
+
|
|
|
private volatile static ConcurrentHashMap<Long, WebSocketOeServer> webSocketMap = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
/**
|
|
|
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
|
|
|
*/
|
|
|
private Session session = null;
|
|
|
+
|
|
|
private String sessionId = null, ip = null;
|
|
|
+
|
|
|
private Long recordId = null;
|
|
|
+
|
|
|
private String platform = null, deviceId = null, Authorization = null;
|
|
|
+
|
|
|
private Long time = null;
|
|
|
+
|
|
|
private RedisUtil redisUtil;
|
|
|
+
|
|
|
private Long updateTime = null;
|
|
|
+
|
|
|
private Map<String, Object> tranMap = null;
|
|
|
+
|
|
|
private String url = "/ws/oe";
|
|
|
|
|
|
+ private static String getStringParameter(Map<String, List<String>> parameters, String key) {
|
|
|
+ List<String> list = parameters.get(key);
|
|
|
+ if (list == null || list.isEmpty()) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return list.get(0);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Long getLongParameter(Map<String, List<String>> parameters, String key) {
|
|
|
+ String value = getStringParameter(parameters, key);
|
|
|
+ if (value != null) {
|
|
|
+ try {
|
|
|
+ return Long.parseLong(value);
|
|
|
+ } catch (Exception e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 连接建立成功调用的方法
|
|
|
*/
|
|
@@ -74,73 +105,97 @@ public class WebSocketOeServer implements Concurrently {
|
|
|
if (Objects.isNull(mapParameter)) {
|
|
|
throw new BusinessException(ExceptionResultEnum.PARAMS_ILLEGALITY);
|
|
|
}
|
|
|
- log.info("mapParameter:{}", JacksonUtil.parseJson(mapParameter));
|
|
|
- log.info("uri:{}", session.getRequestURI());
|
|
|
- if (Objects.isNull(mapParameter.get("platform")) || mapParameter.get("platform").size() == 0) {
|
|
|
+ log.info("OE websocket connect incoming, parameters:{}", JacksonUtil.parseJson(mapParameter));
|
|
|
+ this.platform = getStringParameter(mapParameter, "platform");
|
|
|
+ this.deviceId = getStringParameter(mapParameter, "deviceId");
|
|
|
+ this.Authorization = getStringParameter(mapParameter, "Authorization");
|
|
|
+ this.time = getLongParameter(mapParameter, "time");
|
|
|
+ this.recordId = getLongParameter(mapParameter, "recordId");
|
|
|
+ if (this.time == null) {
|
|
|
+ throw new BusinessException(ExceptionResultEnum.TIME_INVALID);
|
|
|
+ }
|
|
|
+ if (this.recordId == null) {
|
|
|
+ throw new BusinessException(ExceptionResultEnum.RECORD_ID_INVALID);
|
|
|
+ }
|
|
|
+ if (StringUtils.isBlank(this.platform)) {
|
|
|
throw new BusinessException(ExceptionResultEnum.PLATFORM_INVALID);
|
|
|
}
|
|
|
- this.platform = String.valueOf(mapParameter.get("platform").get(0));
|
|
|
- if (Objects.isNull(mapParameter.get("deviceId")) || mapParameter.get("deviceId").size() == 0) {
|
|
|
+ if (StringUtils.isBlank(this.deviceId)) {
|
|
|
throw new BusinessException(ExceptionResultEnum.DEVICE_ID_INVALID);
|
|
|
}
|
|
|
- this.deviceId = String.valueOf(mapParameter.get("deviceId").get(0));
|
|
|
- this.Authorization = String.valueOf(mapParameter.get("Authorization").get(0));
|
|
|
- this.time = Long.parseLong(String.valueOf(mapParameter.get("time").get(0)));
|
|
|
- this.recordId = Long.parseLong(String.valueOf(mapParameter.get("recordId").get(0)));
|
|
|
-// final SignatureInfo info = SignatureInfo
|
|
|
-// .parse(Authorization);
|
|
|
- if (!SystemConstant.expire(this.time.longValue())) {
|
|
|
- final SignatureInfo info = SignatureInfo
|
|
|
- .parse(SystemConstant.GET, url, this.time, this.Authorization);
|
|
|
- if (Objects.nonNull(info) && info.getType() == SignatureType.TOKEN) {
|
|
|
- String sessionId = info.getInvoker();
|
|
|
- redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
- TBSession tbSession = (TBSession) redisUtil.getUserSession(sessionId);
|
|
|
- if (Objects.isNull(tbSession)) {
|
|
|
- throw new BusinessException(ExceptionResultEnum.LOGIN_NO);
|
|
|
- } else {
|
|
|
- if (info.validate(tbSession.getAccessToken()) && info.getTimestamp() < tbSession.getExpireTime()
|
|
|
- && platform.equalsIgnoreCase(tbSession.getPlatform()) && Objects.equals(deviceId, tbSession.getDeviceId())) {
|
|
|
- this.session = session;
|
|
|
- session.setMaxIdleTimeout(SystemConstant.WEBSOCKET_MAX_TIME_OUT);
|
|
|
- this.sessionId = tbSession.getId();
|
|
|
- if (webSocketMap.containsKey(this.recordId)) {
|
|
|
- webSocketMap.remove(this.recordId);
|
|
|
- webSocketMap.put(this.recordId, this);
|
|
|
- } else {
|
|
|
- webSocketMap.put(this.recordId, this);
|
|
|
- addOnlineCount();
|
|
|
- }
|
|
|
- log.info("用户连接:{},当前在线人数为:{}", this.sessionId, getOnlineCount());
|
|
|
- InetSocketAddress addr = (InetSocketAddress) WebsocketUtil.getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
|
|
|
- this.ip = addr.toString().replace("/", "").split(":")[0];
|
|
|
-// this.sendMessage("ip[" + this.ip + "]连接成功");
|
|
|
- log.info("ip[:{}]连接成功", this.ip);
|
|
|
- ExamRecordCacheUtil.setClientWebsocketStatus(recordId, WebsocketStatusEnum.ON_LINE, false);
|
|
|
- ExamRecordCacheUtil.setClientCurrentIp(recordId, this.ip, false);
|
|
|
- ExamRecordCacheUtil.setClientWebsocketId(recordId, this.session.getId(), false);
|
|
|
- Long clientLastSyncTime = System.currentTimeMillis();
|
|
|
- ExamRecordCacheUtil.setClientLastSyncTime(recordId, clientLastSyncTime, false);
|
|
|
- String[] columns = new String[]{ExamRecordFieldEnum.client_websocket_status.name(), ExamRecordFieldEnum.client_current_ip.name(), ExamRecordFieldEnum.client_websocket_id.name(), ExamRecordFieldEnum.client_last_sync_time.name()};
|
|
|
- Object[] values = new Object[]{WebsocketStatusEnum.ON_LINE, this.ip, this.session.getId(), clientLastSyncTime};
|
|
|
- TOeExamRecordService tOeExamRecordService = SpringContextHolder.getBean(TOeExamRecordService.class);
|
|
|
- tOeExamRecordService.dataUpdatesMq(recordId, columns, values);
|
|
|
- tranMap = new HashMap<>();
|
|
|
- tranMap.put("recordId", this.recordId);
|
|
|
- tranMap.put("deviceId", this.deviceId);
|
|
|
- tranMap.put("ip", this.ip);
|
|
|
- this.updateTime = System.currentTimeMillis();
|
|
|
- tranMap.put("updateTime", this.updateTime);
|
|
|
- } else {
|
|
|
- throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
|
|
|
- }
|
|
|
- } else {
|
|
|
+ //校验时间戳是否过期
|
|
|
+ if (SystemConstant.expire(time)) {
|
|
|
+ log.warn("Authorization faile: time expired, server time=" + System.currentTimeMillis());
|
|
|
+ //throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
|
|
|
+ }
|
|
|
+ //校验签名信息
|
|
|
+ final SignatureInfo info = SignatureInfo.parse(SystemConstant.GET, url, this.time, this.Authorization);
|
|
|
+ if (info == null) {
|
|
|
+ log.warn("Authorization faile: signature decode error");
|
|
|
+ throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
|
|
|
+ }
|
|
|
+ if (SignatureType.TOKEN != info.getType()) {
|
|
|
+ log.warn("Authorization faile: signature type is not Token");
|
|
|
throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
|
|
|
}
|
|
|
+ //校验session
|
|
|
+ String sessionId = info.getInvoker();
|
|
|
+ redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
+ TBSession tbSession = (TBSession) redisUtil.getUserSession(sessionId);
|
|
|
+ if (Objects.isNull(tbSession)) {
|
|
|
+ log.warn("Authorization faile: session id not exists: " + sessionId);
|
|
|
+ throw new BusinessException(ExceptionResultEnum.LOGIN_NO);
|
|
|
+ }
|
|
|
+ if (tbSession.getExpireTime() < System.currentTimeMillis() || info.getTimestamp() > tbSession.getExpireTime()) {
|
|
|
+ log.warn("Authorization faile: session has expired, expire time=" + tbSession.getExpireTime());
|
|
|
+ throw new BusinessException(ExceptionResultEnum.LOGIN_NO);
|
|
|
+ }
|
|
|
+ if (!info.validate(tbSession.getAccessToken())) {
|
|
|
+ log.warn("Authorization faile: access token invalid, session token is " + tbSession.getAccessToken());
|
|
|
+ //throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
|
|
|
+ }
|
|
|
+ if (!tbSession.getPlatform().equalsIgnoreCase(platform)) {
|
|
|
+ log.warn("Authorization faile: platform invalid, session platform is " + tbSession.getPlatform());
|
|
|
+ throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
|
|
|
+ }
|
|
|
+ if (!tbSession.getDeviceId().equalsIgnoreCase(deviceId)) {
|
|
|
+ log.warn("Authorization faile: deviceId invalid, session deviceId is " + tbSession.getDeviceId());
|
|
|
+ throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
|
|
|
+ }
|
|
|
+ this.session = session;
|
|
|
+ session.setMaxIdleTimeout(SystemConstant.WEBSOCKET_MAX_TIME_OUT);
|
|
|
+ this.sessionId = tbSession.getId();
|
|
|
+ if (webSocketMap.containsKey(this.recordId)) {
|
|
|
+ webSocketMap.remove(this.recordId);
|
|
|
+ webSocketMap.put(this.recordId, this);
|
|
|
+ } else {
|
|
|
+ webSocketMap.put(this.recordId, this);
|
|
|
+ addOnlineCount();
|
|
|
+ }
|
|
|
+ log.info("用户连接:{},当前在线人数为:{}", this.sessionId, getOnlineCount());
|
|
|
+ InetSocketAddress addr = (InetSocketAddress) WebsocketUtil
|
|
|
+ .getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
|
|
|
+ this.ip = addr.toString().replace("/", "").split(":")[0];
|
|
|
+ // this.sendMessage("ip[" + this.ip + "]连接成功");
|
|
|
+ log.info("ip[:{}]连接成功", this.ip);
|
|
|
+ ExamRecordCacheUtil.setClientWebsocketStatus(recordId, WebsocketStatusEnum.ON_LINE, false);
|
|
|
+ ExamRecordCacheUtil.setClientCurrentIp(recordId, this.ip, false);
|
|
|
+ ExamRecordCacheUtil.setClientWebsocketId(recordId, this.session.getId(), false);
|
|
|
+ Long clientLastSyncTime = System.currentTimeMillis();
|
|
|
+ ExamRecordCacheUtil.setClientLastSyncTime(recordId, clientLastSyncTime, false);
|
|
|
+ String[] columns = new String[] { ExamRecordFieldEnum.client_websocket_status.name(),
|
|
|
+ ExamRecordFieldEnum.client_current_ip.name(), ExamRecordFieldEnum.client_websocket_id.name(),
|
|
|
+ ExamRecordFieldEnum.client_last_sync_time.name() };
|
|
|
+ Object[] values = new Object[] { WebsocketStatusEnum.ON_LINE, this.ip, this.session.getId(),
|
|
|
+ clientLastSyncTime };
|
|
|
+ TOeExamRecordService tOeExamRecordService = SpringContextHolder.getBean(TOeExamRecordService.class);
|
|
|
+ tOeExamRecordService.dataUpdatesMq(recordId, columns, values);
|
|
|
+ tranMap = new HashMap<>();
|
|
|
+ tranMap.put("recordId", this.recordId);
|
|
|
+ tranMap.put("deviceId", this.deviceId);
|
|
|
+ tranMap.put("ip", this.ip);
|
|
|
+ this.updateTime = System.currentTimeMillis();
|
|
|
+ tranMap.put("updateTime", this.updateTime);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -157,21 +212,25 @@ public class WebSocketOeServer implements Concurrently {
|
|
|
Date now = new Date();
|
|
|
ExamRecordCacheUtil.setClientWebsocketStatus(recordId, WebsocketStatusEnum.OFF_LINE, true);
|
|
|
ExamRecordStatusEnum status = ExamRecordCacheUtil.getStatus(this.recordId);
|
|
|
- if (!Objects.equals(status, ExamRecordStatusEnum.FIRST_PREPARE) && !Objects.equals(status, ExamRecordStatusEnum.FINISHED) && !Objects.equals(status, ExamRecordStatusEnum.PERSISTED)) {
|
|
|
+ if (!Objects.equals(status, ExamRecordStatusEnum.FIRST_PREPARE) && !Objects
|
|
|
+ .equals(status, ExamRecordStatusEnum.FINISHED) && !Objects
|
|
|
+ .equals(status, ExamRecordStatusEnum.PERSISTED)) {
|
|
|
//大于等于超时时间,说明规定时间内都没有通信,非正常退出,因为期间会有心跳更新updateTime
|
|
|
if ((now.getTime() - this.updateTime) / 1000 >= SystemConstant.WEBSOCKET_MAX_TIME_OUT / 1000) {
|
|
|
log.info("超时退出");
|
|
|
//发送延时mq消息start
|
|
|
MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
|
|
|
String level = "2m";
|
|
|
-// String level = "30s";
|
|
|
+ // String level = "30s";
|
|
|
Integer time = SystemConstant.mqDelayLevel.get(level);
|
|
|
LocalDateTime dt = LocalDateTime.now();
|
|
|
dt = dt.plusMinutes(Long.parseLong(level.replace("m", "")));
|
|
|
-// dt = dt.plusSeconds(Long.parseLong(level.replace("s", "")));
|
|
|
+ // dt = dt.plusSeconds(Long.parseLong(level.replace("s", "")));
|
|
|
tranMap.put("timeOut", time);
|
|
|
tranMap.put("mqExecTime", dt.toInstant(ZoneOffset.of("+8")).toEpochMilli());
|
|
|
- MqDto mqDto = new MqDto(MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.OE_UN_NORMAL.name(), MqTagEnum.OE_UN_NORMAL, MqTagEnum.OE_UN_NORMAL, String.valueOf(this.recordId), this.tranMap, this.sessionId);
|
|
|
+ MqDto mqDto = new MqDto(MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.OE_UN_NORMAL.name(),
|
|
|
+ MqTagEnum.OE_UN_NORMAL, MqTagEnum.OE_UN_NORMAL, String.valueOf(this.recordId), this.tranMap,
|
|
|
+ this.sessionId);
|
|
|
mqDtoService.assembleSendAsyncDelayMsg(mqDto);
|
|
|
//发送延时mq消息end
|
|
|
} else {
|
|
@@ -201,13 +260,17 @@ public class WebSocketOeServer implements Concurrently {
|
|
|
JSONObject jsonObject = JSONObject.parseObject(message);
|
|
|
log.info("onMessage:{}", jsonObject.toJSONString());
|
|
|
if (Objects.nonNull(jsonObject)) {
|
|
|
- WebSocketOeMessageTemplete webSocketOeMessageTemplete = SpringContextHolder.getBean(WebSocketOeMessageTemplete.class);
|
|
|
+ WebSocketOeMessageTemplete webSocketOeMessageTemplete = SpringContextHolder
|
|
|
+ .getBean(WebSocketOeMessageTemplete.class);
|
|
|
Gson gson = new Gson();
|
|
|
WebsocketDto websocketDto = gson.fromJson(gson.toJson(jsonObject), WebsocketDto.class);
|
|
|
jsonObject.getJSONObject("body").put("recordId", this.recordId);
|
|
|
websocketDto.setBody(jsonObject.getJSONObject("body"));
|
|
|
- Method method = webSocketOeMessageTemplete.getClass().getDeclaredMethod(WebsocketTypeEnum.valueOf(websocketDto.getType()).getDesc(), String.class);
|
|
|
- WebsocketDto result = (WebsocketDto) method.invoke(webSocketOeMessageTemplete, String.valueOf(websocketDto.getBody()));
|
|
|
+ Method method = webSocketOeMessageTemplete.getClass()
|
|
|
+ .getDeclaredMethod(WebsocketTypeEnum.valueOf(websocketDto.getType()).getDesc(),
|
|
|
+ String.class);
|
|
|
+ WebsocketDto result = (WebsocketDto) method
|
|
|
+ .invoke(webSocketOeMessageTemplete, String.valueOf(websocketDto.getBody()));
|
|
|
this.sendMessage(result);
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
@@ -257,7 +320,8 @@ public class WebSocketOeServer implements Concurrently {
|
|
|
* 在线人数加一
|
|
|
*/
|
|
|
public synchronized void addOnlineCount() {
|
|
|
- if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
|
+ if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId,
|
|
|
+ SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
|
try {
|
|
|
Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
|
|
|
int count = 0;
|
|
@@ -278,7 +342,8 @@ public class WebSocketOeServer implements Concurrently {
|
|
|
* 在线人数减一
|
|
|
*/
|
|
|
public synchronized void subOnlineCount() {
|
|
|
- if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
|
+ if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId,
|
|
|
+ SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
|
try {
|
|
|
Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
|
|
|
int count = 0;
|
|
@@ -296,8 +361,8 @@ public class WebSocketOeServer implements Concurrently {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext
|
|
|
- consumeConcurrentlyContext) {
|
|
|
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
|
|
|
+ ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
MqOeLogicService mqOeLogicService = SpringContextHolder.getBean(MqOeLogicService.class);
|
|
|
MqDto mqDto = null;
|
|
@@ -312,7 +377,10 @@ public class WebSocketOeServer implements Concurrently {
|
|
|
if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
|
|
|
mqOeLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
} else {
|
|
|
- if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
|
|
|
+ && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId()))
|
|
|
+ && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(),
|
|
|
+ SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
try {
|
|
|
mqOeLogicService.execMqOeLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|