|
@@ -2,6 +2,7 @@ package com.qmth.themis.exam.websocket;
|
|
|
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
import com.alibaba.fastjson.JSONObject;
|
|
import com.google.gson.Gson;
|
|
import com.google.gson.Gson;
|
|
|
|
+import com.qmth.themis.business.cache.RedisKeyHelper;
|
|
import com.qmth.themis.business.constant.SpringContextHolder;
|
|
import com.qmth.themis.business.constant.SpringContextHolder;
|
|
import com.qmth.themis.business.constant.SystemConstant;
|
|
import com.qmth.themis.business.constant.SystemConstant;
|
|
import com.qmth.themis.business.dto.MqDto;
|
|
import com.qmth.themis.business.dto.MqDto;
|
|
@@ -9,6 +10,7 @@ import com.qmth.themis.business.dto.WebsocketDto;
|
|
import com.qmth.themis.business.entity.TBSession;
|
|
import com.qmth.themis.business.entity.TBSession;
|
|
import com.qmth.themis.business.enums.MqTagEnum;
|
|
import com.qmth.themis.business.enums.MqTagEnum;
|
|
import com.qmth.themis.business.enums.MqTopicEnum;
|
|
import com.qmth.themis.business.enums.MqTopicEnum;
|
|
|
|
+import com.qmth.themis.business.enums.WebsocketStatusEnum;
|
|
import com.qmth.themis.business.enums.WebsocketTypeEnum;
|
|
import com.qmth.themis.business.enums.WebsocketTypeEnum;
|
|
import com.qmth.themis.business.service.MqDtoService;
|
|
import com.qmth.themis.business.service.MqDtoService;
|
|
import com.qmth.themis.business.util.JacksonUtil;
|
|
import com.qmth.themis.business.util.JacksonUtil;
|
|
@@ -115,6 +117,12 @@ public class WebSocketOeServer implements Concurrently {
|
|
this.ip = addr.toString().replace("/", "").split(":")[0];
|
|
this.ip = addr.toString().replace("/", "").split(":")[0];
|
|
// this.sendMessage("ip[" + this.ip + "]连接成功");
|
|
// this.sendMessage("ip[" + this.ip + "]连接成功");
|
|
log.info("ip[:{}]连接成功", this.ip);
|
|
log.info("ip[:{}]连接成功", this.ip);
|
|
|
|
+ Map<String, Object> objectMap = redisUtil.getHashEntries(RedisKeyHelper.examRecordCacheKey(recordId));
|
|
|
|
+ objectMap.put("clientWebsocketStatus", WebsocketStatusEnum.ON_LINE);
|
|
|
|
+ objectMap.put("clientCurrentIp", this.ip);
|
|
|
|
+ objectMap.put("clientWebsocketId", this.session.getId());
|
|
|
|
+ objectMap.put("clientLastSyncTime", new Date());
|
|
|
|
+ redisUtil.setForHash(RedisKeyHelper.examRecordCacheKey(recordId), objectMap);
|
|
tranMap = new HashMap<>();
|
|
tranMap = new HashMap<>();
|
|
tranMap.put("recordId", this.recordId);
|
|
tranMap.put("recordId", this.recordId);
|
|
tranMap.put("deviceId", this.deviceId);
|
|
tranMap.put("deviceId", this.deviceId);
|
|
@@ -141,9 +149,15 @@ public class WebSocketOeServer implements Concurrently {
|
|
subOnlineCount();
|
|
subOnlineCount();
|
|
//判断是否是正常退出
|
|
//判断是否是正常退出
|
|
Date now = new Date();
|
|
Date now = new Date();
|
|
|
|
+ Map<String, Object> objectMap = redisUtil.getHashEntries(RedisKeyHelper.examRecordCacheKey(recordId));
|
|
|
|
+ objectMap.put("clientWebsocketStatus", WebsocketStatusEnum.OFF_LINE);
|
|
|
|
+ redisUtil.setForHash(RedisKeyHelper.examRecordCacheKey(recordId), objectMap);
|
|
//大于等于超时时间,说明规定时间内都没有通信,非正常退出,因为期间会有心跳更新updateTime
|
|
//大于等于超时时间,说明规定时间内都没有通信,非正常退出,因为期间会有心跳更新updateTime
|
|
if ((now.getTime() - this.updateTime) / 1000 >= SystemConstant.WEBSOCKET_MAX_TIME_OUT / 1000) {
|
|
if ((now.getTime() - this.updateTime) / 1000 >= SystemConstant.WEBSOCKET_MAX_TIME_OUT / 1000) {
|
|
log.info("超时退出");
|
|
log.info("超时退出");
|
|
|
|
+ objectMap = redisUtil.getHashEntries(RedisKeyHelper.examRecordCacheKey(recordId));
|
|
|
|
+ objectMap.put("clientWebsocketStatus", WebsocketStatusEnum.OFF_LINE);
|
|
|
|
+ redisUtil.setForHash(RedisKeyHelper.examRecordCacheKey(recordId), objectMap);
|
|
//发送延时mq消息start
|
|
//发送延时mq消息start
|
|
MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
|
|
MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
|
|
String level = "2m";
|
|
String level = "2m";
|
|
@@ -285,6 +299,7 @@ public class WebSocketOeServer implements Concurrently {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
|
+ log.error("mq websocket oe,消息消费出错", e);
|
|
e.printStackTrace();
|
|
e.printStackTrace();
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
} finally {
|
|
} finally {
|