WebSocketServer.java 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. //package com.qmth.themis.backend.websocket;
  2. //
  3. //import com.alibaba.fastjson.JSON;
  4. //import com.alibaba.fastjson.JSONObject;
  5. //import org.apache.commons.lang3.StringUtils;
  6. //import org.slf4j.Logger;
  7. //import org.slf4j.LoggerFactory;
  8. //import org.springframework.stereotype.Component;
  9. //
  10. //import javax.websocket.*;
  11. //import javax.websocket.server.PathParam;
  12. //import javax.websocket.server.ServerEndpoint;
  13. //import java.io.IOException;
  14. //import java.util.concurrent.ConcurrentHashMap;
  15. //
  16. ///**
  17. // * @Description: websocker服务端
  18. // * @Param:
  19. // * @return:
  20. // * @Author: wangliang
  21. // * @Date: 2020/7/10
  22. // */
  23. //@ServerEndpoint("/imserver/{userId}")
  24. //@Component
  25. //public class WebSocketServer
  26. //// implements MessageListenerConcurrently
  27. //{
  28. // private final static Logger log = LoggerFactory.getLogger(WebSocketServer.class);
  29. // /**
  30. // * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
  31. // */
  32. // private static int onlineCount = 0;
  33. // /**
  34. // * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
  35. // */
  36. // private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
  37. // /**
  38. // * 与某个客户端的连接会话,需要通过它来给客户端发送数据
  39. // */
  40. // private Session session;
  41. // /**
  42. // * 接收userId
  43. // */
  44. // private String userId = null;
  45. //
  46. // /**
  47. // * 连接建立成功调用的方法
  48. // */
  49. // @OnOpen
  50. // public void onOpen(Session session, @PathParam("userId") String userId) {
  51. // this.session = session;
  52. // this.userId = userId;
  53. // if (webSocketMap.containsKey(userId)) {
  54. // webSocketMap.remove(userId);
  55. // webSocketMap.put(userId, this);
  56. // //加入set中
  57. // } else {
  58. // webSocketMap.put(userId, this);
  59. // //加入set中
  60. // addOnlineCount();
  61. // //在线数加1
  62. // }
  63. // log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
  64. // try {
  65. // sendMessage("连接成功");
  66. // } catch (IOException e) {
  67. // log.error("用户:" + userId + ",网络异常!!!!!!");
  68. // }
  69. // }
  70. //
  71. // /**
  72. // * 连接关闭调用的方法
  73. // */
  74. // @OnClose
  75. // public void onClose() {
  76. // if (webSocketMap.containsKey(userId)) {
  77. // webSocketMap.remove(userId);
  78. // //从set中删除
  79. // subOnlineCount();
  80. // }
  81. // log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
  82. // }
  83. //
  84. // /**
  85. // * 收到客户端消息后调用的方法
  86. // *
  87. // * @param message 客户端发送过来的消息
  88. // */
  89. // @OnMessage
  90. // public void onMessage(String message, Session session) {
  91. // log.info("用户消息:" + userId + ",报文:" + message);
  92. // //可以群发消息
  93. // //消息保存到数据库、redis
  94. // if (StringUtils.isNotBlank(message)) {
  95. // try {
  96. // //解析发送的报文
  97. // JSONObject jsonObject = JSON.parseObject(message);
  98. // //追加发送人(防止串改)
  99. // jsonObject.put("fromUserId", this.userId);
  100. // String toUserId = jsonObject.getString("toUserId");
  101. // //传送给对应toUserId用户的websocket
  102. // if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) {
  103. // webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
  104. // } else {
  105. // log.error("请求的userId:" + toUserId + "不在该服务器上");
  106. // //否则不在这个服务器上,发送到mysql或者redis
  107. // }
  108. // } catch (Exception e) {
  109. // e.printStackTrace();
  110. // }
  111. // }
  112. // }
  113. //
  114. // /**
  115. // * @param session
  116. // * @param error
  117. // */
  118. // @OnError
  119. // public void onError(Session session, Throwable error) {
  120. // log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
  121. // error.printStackTrace();
  122. // }
  123. //
  124. // /**
  125. // * 实现服务器主动推送
  126. // */
  127. // public void sendMessage(String message) throws IOException {
  128. // this.session.getBasicRemote().sendText(message);
  129. // }
  130. //
  131. //
  132. // /**
  133. // * 发送自定义消息
  134. // */
  135. // public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
  136. // log.info("发送消息到:" + userId + ",报文:" + message);
  137. // if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
  138. // webSocketMap.get(userId).sendMessage(message);
  139. // } else {
  140. // log.error("用户" + userId + ",不在线!");
  141. // }
  142. // }
  143. //
  144. // public void wxappPhotoReady(){
  145. // log.info("wxappPhotoReady is come in");
  146. // }
  147. //
  148. // public static synchronized int getOnlineCount() {
  149. // return onlineCount;
  150. // }
  151. //
  152. // public static synchronized void addOnlineCount() {
  153. // WebSocketServer.onlineCount++;
  154. // }
  155. //
  156. // public static synchronized void subOnlineCount() {
  157. // WebSocketServer.onlineCount--;
  158. // }
  159. //
  160. //// @Override
  161. //// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  162. //// try {
  163. //// long threadId = Thread.currentThread().getId();
  164. //// String threadName = Thread.currentThread().getName();
  165. //// for (MessageExt messageExt : msgs) {
  166. //// log.info(":{}-:{} websocketConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
  167. //// MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
  168. //// log.info(":{}-:{} websocketConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
  169. //// log.info(":{}-:{} websocketConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
  170. //// Map map = mqDto.getProperties();
  171. //// String body = JacksonUtil.parseJson(mqDto.getBody());
  172. //// log.info("map:{},body:{}", JacksonUtil.parseJson(map), body);
  173. //// String model = String.valueOf(map.get("model"));
  174. //// MessageModel messageModel = MessageModel.valueOf(model);
  175. //// if (messageModel.ordinal() == MessageModel.CLUSTERING.ordinal()) {
  176. //// webSocketMap.get(map.get("toUserId")).sendMessage(body);
  177. //// } else {
  178. //// webSocketMap.forEach((k, v) -> {
  179. //// try {
  180. //// v.sendMessage(body);
  181. //// } catch (IOException e) {
  182. //// e.printStackTrace();
  183. //// }
  184. //// });
  185. //// }
  186. //// }
  187. //// } catch (Exception e) {
  188. //// e.printStackTrace();
  189. //// return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
  190. //// }
  191. //// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
  192. //// }
  193. ////
  194. //// @Service
  195. //// @RocketMQMessageListener(consumerGroup = "websocketConsumerImGroup", topic = "websocketImTopic", selectorType = SelectorType.TAG, selectorExpression = "*")
  196. //// public class sessionConsumerWeb implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
  197. ////
  198. //// @Override
  199. //// public void onMessage(Message message) {
  200. //// //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
  201. //// }
  202. ////
  203. //// @Override
  204. //// public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
  205. //// defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
  206. //// defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  207. //// defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
  208. ////// defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
  209. //// defaultMQPushConsumer.registerMessageListener(WebSocketServer.this::consumeMessage);
  210. //// }
  211. //// }
  212. //}
  213. //