|
@@ -46,14 +46,17 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
*/
|
|
*/
|
|
@ServerEndpoint("/admin")
|
|
@ServerEndpoint("/admin")
|
|
@Component
|
|
@Component
|
|
-public class WebSocketAdminServer implements Concurrently {
|
|
|
|
|
|
+public class WebSocketAdminServer
|
|
|
|
+// implements Concurrently
|
|
|
|
+{
|
|
private final static Logger log = LoggerFactory.getLogger(WebSocketAdminServer.class);
|
|
private final static Logger log = LoggerFactory.getLogger(WebSocketAdminServer.class);
|
|
- public volatile static ConcurrentHashMap<String, WebSocketAdminServer> webSocketMap = new ConcurrentHashMap<>();
|
|
|
|
|
|
+ public volatile static ConcurrentHashMap<Long, WebSocketAdminServer> webSocketMap = new ConcurrentHashMap<>();
|
|
/**
|
|
/**
|
|
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
|
|
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
|
|
*/
|
|
*/
|
|
private Session session = null;
|
|
private Session session = null;
|
|
private String sessionId = null, ip = null;
|
|
private String sessionId = null, ip = null;
|
|
|
|
+ private Long userId = null;
|
|
private String platform = null, deviceId = null, Authorization = null;
|
|
private String platform = null, deviceId = null, Authorization = null;
|
|
private Long time = null;
|
|
private Long time = null;
|
|
private RedisUtil redisUtil;
|
|
private RedisUtil redisUtil;
|
|
@@ -81,6 +84,7 @@ public class WebSocketAdminServer implements Concurrently {
|
|
this.deviceId = String.valueOf(mapParameter.get("deviceId").get(0));
|
|
this.deviceId = String.valueOf(mapParameter.get("deviceId").get(0));
|
|
this.Authorization = String.valueOf(mapParameter.get("Authorization").get(0));
|
|
this.Authorization = String.valueOf(mapParameter.get("Authorization").get(0));
|
|
this.time = Long.parseLong(String.valueOf(mapParameter.get("time").get(0)));
|
|
this.time = Long.parseLong(String.valueOf(mapParameter.get("time").get(0)));
|
|
|
|
+ this.userId = Long.parseLong(String.valueOf(mapParameter.get("userId").get(0)));
|
|
String method = SystemConstant.GET;
|
|
String method = SystemConstant.GET;
|
|
final SignatureInfo info = SignatureInfo
|
|
final SignatureInfo info = SignatureInfo
|
|
.parse(Authorization);
|
|
.parse(Authorization);
|
|
@@ -96,12 +100,12 @@ public class WebSocketAdminServer implements Concurrently {
|
|
this.session = session;
|
|
this.session = session;
|
|
session.setMaxIdleTimeout(SystemConstant.WEBSOCKET_MAX_TIME_OUT);
|
|
session.setMaxIdleTimeout(SystemConstant.WEBSOCKET_MAX_TIME_OUT);
|
|
this.sessionId = tbSession.getId();
|
|
this.sessionId = tbSession.getId();
|
|
- if (webSocketMap.containsKey(tbSession.getId())) {
|
|
|
|
- webSocketMap.remove(sessionId);
|
|
|
|
- webSocketMap.put(sessionId, this);
|
|
|
|
|
|
+ if (webSocketMap.containsKey(this.userId)) {
|
|
|
|
+ webSocketMap.remove(this.userId);
|
|
|
|
+ webSocketMap.put(this.userId, this);
|
|
//加入set中
|
|
//加入set中
|
|
} else {
|
|
} else {
|
|
- webSocketMap.put(sessionId, this);
|
|
|
|
|
|
+ webSocketMap.put(this.userId, this);
|
|
//加入set中
|
|
//加入set中
|
|
addOnlineCount();
|
|
addOnlineCount();
|
|
//在线数加1
|
|
//在线数加1
|
|
@@ -113,6 +117,7 @@ public class WebSocketAdminServer implements Concurrently {
|
|
// this.sendMessage("ip[" + this.ip + "]连接成功");
|
|
// this.sendMessage("ip[" + this.ip + "]连接成功");
|
|
log.info("ip[:{}]连接成功", this.ip);
|
|
log.info("ip[:{}]连接成功", this.ip);
|
|
tranMap = new HashMap<>();
|
|
tranMap = new HashMap<>();
|
|
|
|
+ tranMap.put("userId", this.userId);
|
|
tranMap.put("deviceId", this.deviceId);
|
|
tranMap.put("deviceId", this.deviceId);
|
|
tranMap.put("ip", this.ip);
|
|
tranMap.put("ip", this.ip);
|
|
this.updateTime = System.currentTimeMillis();
|
|
this.updateTime = System.currentTimeMillis();
|
|
@@ -132,8 +137,8 @@ public class WebSocketAdminServer implements Concurrently {
|
|
@OnClose
|
|
@OnClose
|
|
public void onClose() {
|
|
public void onClose() {
|
|
log.info("onClose is come in");
|
|
log.info("onClose is come in");
|
|
- if (webSocketMap.containsKey(this.sessionId)) {
|
|
|
|
- webSocketMap.remove(this.sessionId);
|
|
|
|
|
|
+ if (webSocketMap.containsKey(this.userId)) {
|
|
|
|
+ webSocketMap.remove(this.userId);
|
|
//从set中删除
|
|
//从set中删除
|
|
subOnlineCount();
|
|
subOnlineCount();
|
|
//管理端无需发送延时mq消息
|
|
//管理端无需发送延时mq消息
|
|
@@ -235,45 +240,49 @@ public class WebSocketAdminServer implements Concurrently {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
|
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
|
- RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
|
- MqAdminLogicService mqAdminLogicService = SpringContextHolder.getBean(MqAdminLogicService.class);
|
|
|
|
- MqDto mqDto = null;
|
|
|
|
- try {
|
|
|
|
- long threadId = Thread.currentThread().getId();
|
|
|
|
- String threadName = Thread.currentThread().getName();
|
|
|
|
- for (MessageExt messageExt : msgs) {
|
|
|
|
- log.info(":{}-:{} websocket oe Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
|
- mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
|
- log.info(":{}-:{} websocket oe Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
|
- int reconsumeTime = messageExt.getReconsumeTimes();
|
|
|
|
- if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
|
|
|
|
- mqAdminLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
|
- } else {
|
|
|
|
- if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
|
- mqAdminLogicService.execMqAdminLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
|
- } else {
|
|
|
|
- log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
|
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- log.error("mq websocket admin,消息消费出错", e);
|
|
|
|
- e.printStackTrace();
|
|
|
|
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
|
- } finally {
|
|
|
|
- if (Objects.nonNull(mqDto)) {
|
|
|
|
- redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
|
- }
|
|
|
|
|
|
+// @Override
|
|
|
|
+// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
|
+// RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
|
+// MqAdminLogicService mqAdminLogicService = SpringContextHolder.getBean(MqAdminLogicService.class);
|
|
|
|
+// MqDto mqDto = null;
|
|
|
|
+// try {
|
|
|
|
+// long threadId = Thread.currentThread().getId();
|
|
|
|
+// String threadName = Thread.currentThread().getName();
|
|
|
|
+// for (MessageExt messageExt : msgs) {
|
|
|
|
+// log.info(":{}-:{} websocket oe Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
|
+// mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
|
+// log.info(":{}-:{} websocket oe Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
|
+// int reconsumeTime = messageExt.getReconsumeTimes();
|
|
|
|
+// if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
|
|
|
|
+// mqAdminLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
|
+// } else {
|
|
|
|
+// if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
|
+// mqAdminLogicService.execMqAdminLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
|
+// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
|
+// } else {
|
|
|
|
+// log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
|
+// return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+// } catch (Exception e) {
|
|
|
|
+// log.error("mq websocket admin,消息消费出错", e);
|
|
|
|
+// e.printStackTrace();
|
|
|
|
+// return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
|
+// } finally {
|
|
|
|
+// if (Objects.nonNull(mqDto)) {
|
|
|
|
+// redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
|
+// }
|
|
|
|
|
|
- public static ConcurrentHashMap<String, WebSocketAdminServer> getWebSocketMap() {
|
|
|
|
|
|
+ public static ConcurrentHashMap<Long, WebSocketAdminServer> getWebSocketMap() {
|
|
return webSocketMap;
|
|
return webSocketMap;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ public Long getUserId() {
|
|
|
|
+ return userId;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|