|
@@ -1,259 +0,0 @@
|
|
|
-package com.qmth.themis.admin.websocket;
|
|
|
-
|
|
|
-import com.alibaba.fastjson.JSONObject;
|
|
|
-import com.google.gson.Gson;
|
|
|
-import com.qmth.themis.admin.websocket.interceptor.WebSocketAdminHandshakeInterceptor;
|
|
|
-import com.qmth.themis.admin.websocketTemplete.WebSocketAdminMessageTemplete;
|
|
|
-import com.qmth.themis.business.constant.SpringContextHolder;
|
|
|
-import com.qmth.themis.business.constant.SystemConstant;
|
|
|
-import com.qmth.themis.business.dto.WebsocketDto;
|
|
|
-import com.qmth.themis.business.entity.TBSession;
|
|
|
-import com.qmth.themis.business.enums.WebsocketTypeEnum;
|
|
|
-import com.qmth.themis.business.util.JacksonUtil;
|
|
|
-import com.qmth.themis.business.util.RedisUtil;
|
|
|
-import com.qmth.themis.business.util.UidUtil;
|
|
|
-import com.qmth.themis.business.util.WebsocketUtil;
|
|
|
-import com.qmth.themis.common.contanst.Constants;
|
|
|
-import com.qmth.themis.common.enums.ExceptionResultEnum;
|
|
|
-import com.qmth.themis.common.exception.BusinessException;
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
-
|
|
|
-import javax.websocket.*;
|
|
|
-import javax.websocket.server.ServerEndpoint;
|
|
|
-import java.io.IOException;
|
|
|
-import java.lang.reflect.Method;
|
|
|
-import java.net.InetSocketAddress;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Objects;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-
|
|
|
-/**
|
|
|
- * @Description: websocker管理端
|
|
|
- * @Param:
|
|
|
- * @return:
|
|
|
- * @Author: wangliang
|
|
|
- * @Date: 2020/7/10
|
|
|
- */
|
|
|
-@ServerEndpoint(value = "/ws/admin", configurator = WebSocketAdminHandshakeInterceptor.class)
|
|
|
-@Component
|
|
|
-public class WebSocketAdminServer
|
|
|
-// implements Orderly
|
|
|
-{
|
|
|
- private final static Logger log = LoggerFactory.getLogger(WebSocketAdminServer.class);
|
|
|
- public volatile static ConcurrentHashMap<Long, WebSocketAdminServer> webSocketMap = new ConcurrentHashMap<>();
|
|
|
- /**
|
|
|
- * 与某个客户端的连接会话,需要通过它来给客户端发送数据
|
|
|
- */
|
|
|
- private Session session = null;
|
|
|
- private String sessionId = null, ip = null, deviceId = null, websocketSessionId = null;
|
|
|
- private Long userId = null, updateTime = null;
|
|
|
- private RedisUtil redisUtil;
|
|
|
- private Map<String, Object> tranMap = null;
|
|
|
-
|
|
|
- /**
|
|
|
- * 连接建立成功调用的方法
|
|
|
- */
|
|
|
- @OnOpen
|
|
|
- public void onOpen(Session session) {
|
|
|
- this.redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
- this.userId = (Long) session.getUserProperties().get(Constants.HEADER_USER_ID);
|
|
|
- this.deviceId = (String) session.getUserProperties().get(Constants.HEADER_DEVICE_ID);
|
|
|
- TBSession tbSession = (TBSession) session.getUserProperties().get(Constants.HEADER_TB_SESSION);
|
|
|
- session.setMaxIdleTimeout(SystemConstant.WEBSOCKET_MAX_TIME_OUT);
|
|
|
- this.sessionId = tbSession.getId();
|
|
|
- websocketSessionId = String.valueOf(UidUtil.nextId());
|
|
|
- if (webSocketMap.containsKey(this.userId)) {
|
|
|
- throw new BusinessException(ExceptionResultEnum.REPEAT_CONNECT_ERROR);
|
|
|
- } else {
|
|
|
- webSocketMap.put(this.userId, this);
|
|
|
- addOnlineCount();
|
|
|
- }
|
|
|
- log.info("用户连接:{},当前在线人数为:{}", this.websocketSessionId, getOnlineCount());
|
|
|
- InetSocketAddress addr = (InetSocketAddress) WebsocketUtil.getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
|
|
|
- this.ip = addr.toString().replace("/", "").split(":")[0];
|
|
|
- log.info("ip[:{}]连接成功", this.ip);
|
|
|
- this.updateTime = System.currentTimeMillis();
|
|
|
- tranMap = WebsocketUtil.initWebsocket(null, userId, deviceId, ip, updateTime);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 连接关闭调用的方法
|
|
|
- */
|
|
|
- @OnClose
|
|
|
- public void onClose() {
|
|
|
- log.info("onClose is come in");
|
|
|
- if (webSocketMap.containsKey(this.userId)) {
|
|
|
- webSocketMap.remove(this.userId);
|
|
|
- //从set中删除
|
|
|
- subOnlineCount();
|
|
|
- //管理端无需发送延时mq消息
|
|
|
- }
|
|
|
- log.info("用户退出:{},当前在线人数为:{},updateTime:{}", this.websocketSessionId, getOnlineCount(), this.updateTime);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 收到客户端消息后调用的方法
|
|
|
- *
|
|
|
- * @param message
|
|
|
- * @param session
|
|
|
- */
|
|
|
- @OnMessage
|
|
|
- public void onMessage(String message, Session session) {
|
|
|
- //可以群发消息
|
|
|
- //消息保存到数据库、redis
|
|
|
- if (Objects.nonNull(message) && session.isOpen()) {
|
|
|
- try {
|
|
|
- //解析发送的报文
|
|
|
- JSONObject jsonObject = JSONObject.parseObject(message);
|
|
|
- log.info("onMessage:{}", jsonObject.toJSONString());
|
|
|
- if (Objects.nonNull(jsonObject)) {
|
|
|
- WebSocketAdminMessageTemplete webSocketAdminMessageTemplete = SpringContextHolder.getBean(WebSocketAdminMessageTemplete.class);
|
|
|
- Gson gson = new Gson();
|
|
|
- WebsocketDto websocketDto = gson.fromJson(gson.toJson(jsonObject), WebsocketDto.class);
|
|
|
- Method method = webSocketAdminMessageTemplete.getClass().getDeclaredMethod(WebsocketTypeEnum.valueOf(websocketDto.getType().toUpperCase()).getDesc(), String.class, Long.class);
|
|
|
- WebsocketDto result = (WebsocketDto) method.invoke(webSocketAdminMessageTemplete, String.valueOf(websocketDto.getBody()), websocketDto.getTime());
|
|
|
- this.sendMessage(result);
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.error(SystemConstant.LOG_ERROR, e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 错误
|
|
|
- *
|
|
|
- * @param session
|
|
|
- * @param error
|
|
|
- */
|
|
|
- @OnError
|
|
|
- public void onError(Session session, Throwable error) throws IOException {
|
|
|
- log.error("用户错误:{},原因:{}", this.websocketSessionId, error.getMessage());
|
|
|
- close(this);
|
|
|
- throw new BusinessException(error.getMessage());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 实现服务器主动推送
|
|
|
- *
|
|
|
- * @param message
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public void sendMessage(Object message) {
|
|
|
- log.info("message:{}", message);
|
|
|
- this.session.getAsyncRemote().sendText(JacksonUtil.parseJson(message));
|
|
|
- this.updateTime = System.currentTimeMillis();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取在线人数
|
|
|
- *
|
|
|
- * @return
|
|
|
- */
|
|
|
- public synchronized int getOnlineCount() {
|
|
|
- Object o = redisUtil.get(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT);
|
|
|
- return Objects.isNull(o) ? 0 : (int) o;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 在线人数加一
|
|
|
- */
|
|
|
- public synchronized void addOnlineCount() {
|
|
|
- if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
|
- try {
|
|
|
- Object o = redisUtil.get(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT);
|
|
|
- int count = 0;
|
|
|
- if (Objects.nonNull(o)) {
|
|
|
- count = (int) o;
|
|
|
- }
|
|
|
- count++;
|
|
|
- redisUtil.set(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT, count);
|
|
|
- } finally {
|
|
|
- if (Objects.nonNull(this.websocketSessionId)) {
|
|
|
- redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 在线人数减一
|
|
|
- */
|
|
|
- public synchronized void subOnlineCount() {
|
|
|
- if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
|
|
|
- try {
|
|
|
- Object o = redisUtil.get(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT);
|
|
|
- int count = 0;
|
|
|
- if (Objects.nonNull(o)) {
|
|
|
- count = (int) o;
|
|
|
- }
|
|
|
- count--;
|
|
|
- redisUtil.set(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT, count < 0 ? 0 : count);
|
|
|
- } finally {
|
|
|
- if (Objects.nonNull(this.websocketSessionId)) {
|
|
|
- redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-// @Override
|
|
|
-// public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
|
|
|
-// RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
-// MqAdminLogicService mqAdminLogicService = SpringContextHolder.getBean(MqAdminLogicService.class);
|
|
|
-// MqDto mqDto = null;
|
|
|
-// try {
|
|
|
-// long threadId = Thread.currentThread().getId();
|
|
|
-// String threadName = Thread.currentThread().getName();
|
|
|
-// for (MessageExt messageExt : msgs) {
|
|
|
-// log.info(":{}-:{} websocket oe Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
-// mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
-// log.info(":{}-:{} websocket oe Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
-// int reconsumeTime = messageExt.getReconsumeTimes();
|
|
|
-// if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
|
|
|
-// mqAdminLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
-// } else {
|
|
|
-// if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
-// mqAdminLogicService.execMqAdminLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
-// return ConsumeOrderlyStatus.SUCCESS;
|
|
|
-// } else {
|
|
|
-// log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
-// return ConsumeConcurrentlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-// } catch (Exception e) {
|
|
|
-// log.error("mq websocket admin,消息消费出错", e);
|
|
|
-// e.printStackTrace();
|
|
|
-// return ConsumeOrderlyStatus.RECONSUME_LATER;//重试
|
|
|
-// } finally {
|
|
|
-// if (Objects.nonNull(mqDto)) {
|
|
|
-// redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
|
-// }
|
|
|
-// }
|
|
|
-// return ConsumeOrderlyStatus.SUCCESS;//成功
|
|
|
-// }
|
|
|
-
|
|
|
- /**
|
|
|
- * 关闭session
|
|
|
- *
|
|
|
- * @param webSocketAdminServer
|
|
|
- */
|
|
|
- public static void close(WebSocketAdminServer webSocketAdminServer) throws IOException {
|
|
|
- //判断当前连接是否还在线
|
|
|
- if (Objects.nonNull(webSocketAdminServer) && Objects.nonNull(webSocketAdminServer.session) && webSocketAdminServer.session.isOpen()) {
|
|
|
- webSocketAdminServer.session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, SystemConstant.WEBSOCKET_CLOSE));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public static ConcurrentHashMap<Long, WebSocketAdminServer> getWebSocketMap() {
|
|
|
- return webSocketMap;
|
|
|
- }
|
|
|
-
|
|
|
- public Long getUserId() {
|
|
|
- return userId;
|
|
|
- }
|
|
|
-}
|
|
|
-
|