|
@@ -2,7 +2,7 @@ package com.qmth.themis.backend.websocket;
|
|
|
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.google.gson.Gson;
|
|
|
-import com.qmth.themis.backend.config.DictionaryConfig;
|
|
|
+import com.qmth.themis.backend.listener.service.MqAdminLogicService;
|
|
|
import com.qmth.themis.backend.websocketTemplete.WebSocketAdminMessageTemplete;
|
|
|
import com.qmth.themis.business.constant.SpringContextHolder;
|
|
|
import com.qmth.themis.business.constant.SystemConstant;
|
|
@@ -14,21 +14,24 @@ import com.qmth.themis.business.enums.WebsocketTypeEnum;
|
|
|
import com.qmth.themis.business.util.JacksonUtil;
|
|
|
import com.qmth.themis.business.util.RedisUtil;
|
|
|
import com.qmth.themis.business.util.WebsocketUtil;
|
|
|
+import com.qmth.themis.common.contanst.Constants;
|
|
|
import com.qmth.themis.common.enums.ExceptionResultEnum;
|
|
|
import com.qmth.themis.common.exception.BusinessException;
|
|
|
import com.qmth.themis.common.signature.SignatureInfo;
|
|
|
import com.qmth.themis.common.signature.SignatureType;
|
|
|
-import com.qmth.themis.common.util.Result;
|
|
|
import com.qmth.themis.mq.dto.MqDto;
|
|
|
import com.qmth.themis.mq.enums.MqTagEnum;
|
|
|
import com.qmth.themis.mq.enums.MqTopicEnum;
|
|
|
import com.qmth.themis.mq.service.MqDtoService;
|
|
|
+import com.qmth.themis.mq.templete.Concurrently;
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
+import org.apache.rocketmq.common.message.MessageExt;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.websocket.*;
|
|
|
-import javax.websocket.server.PathParam;
|
|
|
import javax.websocket.server.ServerEndpoint;
|
|
|
import java.io.IOException;
|
|
|
import java.lang.reflect.Method;
|
|
@@ -47,11 +50,9 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
*/
|
|
|
@ServerEndpoint("/admin")
|
|
|
@Component
|
|
|
-public class WebSocketAdminServer
|
|
|
-// implements MessageListenerConcurrently
|
|
|
-{
|
|
|
+public class WebSocketAdminServer implements Concurrently {
|
|
|
private final static Logger log = LoggerFactory.getLogger(WebSocketAdminServer.class);
|
|
|
- private volatile static ConcurrentHashMap<String, WebSocketAdminServer> webSocketMap = new ConcurrentHashMap<>();
|
|
|
+ public volatile static ConcurrentHashMap<String, WebSocketAdminServer> webSocketMap = new ConcurrentHashMap<>();
|
|
|
/**
|
|
|
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
|
|
|
*/
|
|
@@ -111,18 +112,13 @@ public class WebSocketAdminServer
|
|
|
}
|
|
|
//发送恢复网络mq消息
|
|
|
log.info("用户连接:" + this.sessionId + ",当前在线人数为:" + getOnlineCount());
|
|
|
- try {
|
|
|
- InetSocketAddress addr = (InetSocketAddress) WebsocketUtil.getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
|
|
|
- this.ip = addr.toString().replace("/", "").split(":")[0];
|
|
|
- this.sendMessage("ip[" + this.ip + "]连接成功");
|
|
|
- tranMap = new HashMap<>();
|
|
|
- tranMap.put("deviceId", this.deviceId);
|
|
|
- tranMap.put("ip", this.ip);
|
|
|
- tranMap.put("updateTime", this.updateTime);
|
|
|
- } catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
- log.error("用户:" + this.sessionId + ",网络异常!!!!!!");
|
|
|
- }
|
|
|
+ InetSocketAddress addr = (InetSocketAddress) WebsocketUtil.getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
|
|
|
+ this.ip = addr.toString().replace("/", "").split(":")[0];
|
|
|
+ this.sendMessage("ip[" + this.ip + "]连接成功");
|
|
|
+ tranMap = new HashMap<>();
|
|
|
+ tranMap.put("deviceId", this.deviceId);
|
|
|
+ tranMap.put("ip", this.ip);
|
|
|
+ tranMap.put("updateTime", this.updateTime);
|
|
|
} else {
|
|
|
throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
|
|
|
}
|
|
@@ -212,31 +208,19 @@ public class WebSocketAdminServer
|
|
|
* @param message
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public void sendMessage(String message) throws IOException {
|
|
|
+ public void sendMessage(Object message) {
|
|
|
log.info("message:{}", message);
|
|
|
- this.session.getAsyncRemote().sendText(message);
|
|
|
+ this.session.getAsyncRemote().sendText(JacksonUtil.parseJson(message));
|
|
|
this.updateTime = System.currentTimeMillis();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 发送自定义消息
|
|
|
- */
|
|
|
- public static void sendInfo(String message, @PathParam("sessionId") String sessionId) throws IOException {
|
|
|
- log.info("发送消息到:{},报文:{}", sessionId, message);
|
|
|
- if (Objects.nonNull(sessionId) && webSocketMap.containsKey(sessionId)) {
|
|
|
- webSocketMap.get(sessionId).sendMessage(message);
|
|
|
- } else {
|
|
|
- log.error("用户[:{}]不在线!", sessionId);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* 获取在线人数
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
public synchronized int getOnlineCount() {
|
|
|
- Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
|
|
|
+ Object o = redisUtil.get(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT);
|
|
|
return Objects.isNull(o) ? 0 : (int) o;
|
|
|
}
|
|
|
|
|
@@ -245,13 +229,13 @@ public class WebSocketAdminServer
|
|
|
*/
|
|
|
public synchronized void addOnlineCount() {
|
|
|
if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
|
- Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
|
|
|
+ Object o = redisUtil.get(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT);
|
|
|
int count = 0;
|
|
|
if (Objects.nonNull(o)) {
|
|
|
count = (int) o;
|
|
|
}
|
|
|
count++;
|
|
|
- redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count);
|
|
|
+ redisUtil.set(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT, count);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -260,67 +244,59 @@ public class WebSocketAdminServer
|
|
|
*/
|
|
|
public synchronized void subOnlineCount() {
|
|
|
if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
|
- Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
|
|
|
+ Object o = redisUtil.get(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT);
|
|
|
int count = 0;
|
|
|
if (Objects.nonNull(o)) {
|
|
|
count = (int) o;
|
|
|
}
|
|
|
count--;
|
|
|
- redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count < 0 ? 0 : count);
|
|
|
+ redisUtil.set(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT, count < 0 ? 0 : count);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
+ RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
+ MqAdminLogicService mqAdminLogicService = SpringContextHolder.getBean(MqAdminLogicService.class);
|
|
|
+ MqDto mqDto = null;
|
|
|
+ try {
|
|
|
+ long threadId = Thread.currentThread().getId();
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
+ for (MessageExt messageExt : msgs) {
|
|
|
+ log.info(":{}-:{} websocket oe Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
+ mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
+ log.info(":{}-:{} websocket oe Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
+ int reconsumeTime = messageExt.getReconsumeTimes();
|
|
|
+ if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
|
|
|
+ mqAdminLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
+ } else {
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
+ mqAdminLogicService.execMqAdminLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
+ } else {
|
|
|
+ log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ if (Objects.nonNull(mqDto)) {
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
|
+ }
|
|
|
}
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
}
|
|
|
|
|
|
-// @Override
|
|
|
-// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
-// try {
|
|
|
-// long threadId = Thread.currentThread().getId();
|
|
|
-// String threadName = Thread.currentThread().getName();
|
|
|
-// for (MessageExt messageExt : msgs) {
|
|
|
-// log.info(":{}-:{} websocketConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
-// MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
-// log.info(":{}-:{} websocketConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
-// log.info(":{}-:{} websocketConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
|
-// Map map = mqDto.getProperties();
|
|
|
-// String body = JacksonUtil.parseJson(mqDto.getBody());
|
|
|
-// log.info("map:{},body:{}", JacksonUtil.parseJson(map), body);
|
|
|
-// String model = String.valueOf(map.get("model"));
|
|
|
-// MessageModel messageModel = MessageModel.valueOf(model);
|
|
|
-// if (messageModel.ordinal() == MessageModel.CLUSTERING.ordinal()) {
|
|
|
-// webSocketMap.get(map.get("toUserId")).sendMessage(body);
|
|
|
-// } else {
|
|
|
-// webSocketMap.forEach((k, v) -> {
|
|
|
-// try {
|
|
|
-// v.sendMessage(body);
|
|
|
-// } catch (IOException e) {
|
|
|
-// e.printStackTrace();
|
|
|
-// }
|
|
|
-// });
|
|
|
-// }
|
|
|
-// }
|
|
|
-// } catch (Exception e) {
|
|
|
-// e.printStackTrace();
|
|
|
-// return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
-// }
|
|
|
-// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
-// }
|
|
|
-//
|
|
|
-// @Service
|
|
|
-// @RocketMQMessageListener(consumerGroup = "websocketConsumerImGroup", topic = "websocketImTopic", selectorType = SelectorType.TAG, selectorExpression = "*")
|
|
|
-// public class sessionConsumerWeb implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
-//
|
|
|
-// @Override
|
|
|
-// public void onMessage(Message message) {
|
|
|
-// //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
-// }
|
|
|
-//
|
|
|
-// @Override
|
|
|
-// public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
-// defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
-// defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
-// defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
-//// defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
|
|
|
-// defaultMQPushConsumer.registerMessageListener(WebSocketServer.this::consumeMessage);
|
|
|
-// }
|
|
|
-// }
|
|
|
+ public static ConcurrentHashMap<String, WebSocketAdminServer> getWebSocketMap() {
|
|
|
+ return webSocketMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void setWebSocketMap(ConcurrentHashMap<String, WebSocketAdminServer> webSocketMap) {
|
|
|
+ WebSocketAdminServer.webSocketMap = webSocketMap;
|
|
|
+ }
|
|
|
}
|
|
|
|