|
@@ -21,6 +21,7 @@ import com.qmth.themis.common.exception.BusinessException;
|
|
|
import com.qmth.themis.common.signature.SignatureInfo;
|
|
|
import com.qmth.themis.common.signature.SignatureType;
|
|
|
import com.qmth.themis.exam.listener.service.MqOeLogicService;
|
|
|
+import com.qmth.themis.exam.websocketTemplete.WebSocketMobileMessageTemplete;
|
|
|
import com.qmth.themis.exam.websocketTemplete.WebSocketOeMessageTemplete;
|
|
|
import com.qmth.themis.mq.templete.Concurrently;
|
|
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
@@ -49,9 +50,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
*/
|
|
|
@ServerEndpoint("/mobile")
|
|
|
@Component
|
|
|
-public class WebSocketMobileServer
|
|
|
-// implements Concurrently
|
|
|
-{
|
|
|
+public class WebSocketMobileServer {
|
|
|
private final static Logger log = LoggerFactory.getLogger(WebSocketMobileServer.class);
|
|
|
private volatile static ConcurrentHashMap<Long, WebSocketMobileServer> webSocketMap = new ConcurrentHashMap<>();
|
|
|
/**
|
|
@@ -141,25 +140,6 @@ public class WebSocketMobileServer
|
|
|
webSocketMap.remove(this.recordId);
|
|
|
//从set中删除
|
|
|
subOnlineCount();
|
|
|
- //判断是否是正常退出
|
|
|
- Date now = new Date();
|
|
|
- //大于等于超时时间,说明规定时间内都没有通信,非正常退出,因为期间会有心跳更新updateTime
|
|
|
- if ((now.getTime() - this.updateTime) / 1000 >= SystemConstant.WEBSOCKET_MAX_TIME_OUT / 1000) {
|
|
|
- log.info("超时退出");
|
|
|
- //发送延时mq消息start
|
|
|
- MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
|
|
|
- String level = "2m";
|
|
|
-// String level = "30s";
|
|
|
- Integer time = SystemConstant.mqDelayLevel.get(level);
|
|
|
- LocalDateTime dt = LocalDateTime.now();
|
|
|
- dt = dt.plusMinutes(Long.parseLong(level.replace("m", "")));
|
|
|
-// dt = dt.plusSeconds(Long.parseLong(level.replace("s", "")));
|
|
|
- tranMap.put("timeOut", time);
|
|
|
- tranMap.put("mqExecTime", dt.toInstant(ZoneOffset.of("+8")).toEpochMilli());
|
|
|
- MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.oeUnNormal.name(), SystemOperationEnum.OE_NET_UN_NORMAL, MqTagEnum.oeUnNormal, String.valueOf(this.recordId), this.tranMap, this.sessionId);
|
|
|
- mqDtoService.assembleSendAsyncDelayMsg(mqDto);
|
|
|
- //发送延时mq消息end
|
|
|
- }
|
|
|
}
|
|
|
log.info("用户退出:{},当前在线人数为:{},updateTime:{}", this.sessionId, getOnlineCount(), this.updateTime);
|
|
|
}
|
|
@@ -180,14 +160,14 @@ public class WebSocketMobileServer
|
|
|
JSONObject jsonObject = JSONObject.parseObject(message);
|
|
|
log.info("onMessage:{}", jsonObject.toJSONString());
|
|
|
if (Objects.nonNull(jsonObject)) {
|
|
|
- WebSocketOeMessageTemplete webSocketOeMessageTemplete = SpringContextHolder.getBean(WebSocketOeMessageTemplete.class);
|
|
|
+ WebSocketMobileMessageTemplete webSocketMobileMessageTemplete = SpringContextHolder.getBean(WebSocketMobileMessageTemplete.class);
|
|
|
Gson gson = new Gson();
|
|
|
WebsocketDto websocketDto = gson.fromJson(gson.toJson(jsonObject), WebsocketDto.class);
|
|
|
//todo 加入当前时间和time比较的校验
|
|
|
jsonObject.getJSONObject("body").put("recordId", this.recordId);
|
|
|
websocketDto.setBody(jsonObject.getJSONObject("body"));
|
|
|
- Method method = webSocketOeMessageTemplete.getClass().getDeclaredMethod(WebsocketTypeEnum.valueOf(websocketDto.getType()).getDesc(), String.class);
|
|
|
- WebsocketDto result = (WebsocketDto) method.invoke(webSocketOeMessageTemplete, String.valueOf(websocketDto.getBody()));
|
|
|
+ Method method = webSocketMobileMessageTemplete.getClass().getDeclaredMethod(WebsocketTypeEnum.valueOf(websocketDto.getType()).getDesc(), String.class);
|
|
|
+ WebsocketDto result = (WebsocketDto) method.invoke(webSocketMobileMessageTemplete, String.valueOf(websocketDto.getBody()));
|
|
|
this.sendMessage(JSONObject.toJSONString(result));
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
@@ -261,43 +241,6 @@ public class WebSocketMobileServer
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// @Override
|
|
|
-// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
-// RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
-// MqOeLogicService mqOeLogicService = SpringContextHolder.getBean(MqOeLogicService.class);
|
|
|
-// MqDto mqDto = null;
|
|
|
-// try {
|
|
|
-// long threadId = Thread.currentThread().getId();
|
|
|
-// String threadName = Thread.currentThread().getName();
|
|
|
-// for (MessageExt messageExt : msgs) {
|
|
|
-// log.info(":{}-:{} websocket oe Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
-// mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
-// log.info(":{}-:{} websocket oe Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
-// int reconsumeTime = messageExt.getReconsumeTimes();
|
|
|
-// if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
|
|
|
-// mqOeLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
-// } else {
|
|
|
-// if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
-// mqOeLogicService.execMqOeLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
-// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
-// } else {
|
|
|
-// log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
-// return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-// } catch (Exception e) {
|
|
|
-// e.printStackTrace();
|
|
|
-// return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
-//
|
|
|
-// } finally {
|
|
|
-// if (Objects.nonNull(mqDto)) {
|
|
|
-// redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
|
-// }
|
|
|
-// }
|
|
|
-// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
-// }
|
|
|
-
|
|
|
public static ConcurrentHashMap<Long, WebSocketMobileServer> getWebSocketMap() {
|
|
|
return webSocketMap;
|
|
|
}
|