wangliang пре 5 година
родитељ
комит
a285dac47b

+ 24 - 3
themis-backend/src/main/java/com/qmth/themis/backend/api/TBUserController.java

@@ -9,6 +9,7 @@ import com.qmth.themis.backend.elasticsearch.repository.ETEStudentRepo;
 import com.qmth.themis.backend.elasticsearch.service.ETEStudentService;
 import com.qmth.themis.backend.elasticsearch.service.ETEStudentService;
 import com.qmth.themis.backend.mongodb.entity.*;
 import com.qmth.themis.backend.mongodb.entity.*;
 import com.qmth.themis.backend.util.ServletUtil;
 import com.qmth.themis.backend.util.ServletUtil;
+import com.qmth.themis.backend.websocket.WebSocketServer;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.dto.AuthDto;
 import com.qmth.themis.business.dto.AuthDto;
 import com.qmth.themis.business.entity.*;
 import com.qmth.themis.business.entity.*;
@@ -30,9 +31,11 @@ import com.qmth.themis.common.signature.SignatureType;
 import com.qmth.themis.common.util.AesUtil;
 import com.qmth.themis.common.util.AesUtil;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
 import com.qmth.themis.common.util.ResultUtil;
+import com.qmth.themis.mq.dto.MqDto;
 import com.qmth.themis.mq.service.MqDtoService;
 import com.qmth.themis.mq.service.MqDtoService;
 import io.swagger.annotations.*;
 import io.swagger.annotations.*;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.spring.annotation.MessageModel;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.beans.factory.annotation.Value;
@@ -46,6 +49,7 @@ import org.springframework.web.bind.annotation.RestController;
 
 
 import javax.annotation.Resource;
 import javax.annotation.Resource;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.io.UnsupportedEncodingException;
 import java.security.NoSuchAlgorithmException;
 import java.security.NoSuchAlgorithmException;
 import java.security.spec.InvalidKeySpecException;
 import java.security.spec.InvalidKeySpecException;
@@ -137,6 +141,14 @@ public class TBUserController {
         return ResultUtil.ok(map);
         return ResultUtil.ok(map);
     }
     }
 
 
+    @ApiOperation(value = "es查询接口")
+    @RequestMapping(value = "/websocketPush", method = RequestMethod.POST)
+    public Result websocketPush(HttpServletRequest request, @RequestBody String message) throws IOException {
+        TBUser tbUser = (TBUser) ServletUtil.getRequestAccount(request);
+        WebSocketServer.sendInfo(message, String.valueOf(tbUser.getId()));
+        return ResultUtil.ok(SystemConstant.SUCCESS);
+    }
+
     @Resource
     @Resource
     TEExamActivityService teExamActivityService;
     TEExamActivityService teExamActivityService;
 
 
@@ -201,9 +213,18 @@ public class TBUserController {
 //        return ResultUtil.ok(map);
 //        return ResultUtil.ok(map);
 
 
 //        for (int i = 0; i < 5; i++) {
 //        for (int i = 0; i < 5; i++) {
-        mqDtoService.assembleSendOneWayMsg("imTopic", "teacher", "老师发送的一条消息1", MqEnum.MESSAGE_LOG.name(), "1", "2");
-        mqDtoService.assembleSendOneWayMsg("imTopic", "user", "用户发送的一条消息2", MqEnum.MESSAGE_LOG.name(), "1", "2");
-        mqDtoService.assembleSendOneWayMsg("imTopic", "user", "学生发送的一条消息3", MqEnum.MESSAGE_LOG.name(), "1", "2");
+
+//        mqDtoService.assembleSendOneWayMsg("imTopic", "teacher", "老师发送的一条消息1", MqEnum.MESSAGE_LOG.name(), "1", "2");
+//        mqDtoService.assembleSendOneWayMsg("imTopic", "user", "用户发送的一条消息2", MqEnum.MESSAGE_LOG.name(), "1", "2");
+//        mqDtoService.assembleSendOneWayMsg("imTopic", "user", "学生发送的一条消息3", MqEnum.MESSAGE_LOG.name(), "1", "2");
+        Map map = new HashMap();
+        map.put("sendUserId", "10");
+        map.put("toUserId", "20");
+        map.put("model", MessageModel.BROADCASTING);
+//        map.put("model", MessageModel.CLUSTERING);
+        MqDto mqDto = new MqDto("websocketImTopic", "im", "学生发送的一条消息4", MqEnum.MESSAGE_LOG, "1", map, "2");
+//        mqDtoService.assembleSendOneWayMsg("imTopic", "user", "学生发送的一条消息3", MqEnum.MESSAGE_LOG.name(), "1", "2");
+        mqDtoService.assembleSendOneWayMsg(mqDto);
         //        }
         //        }
         return ResultUtil.ok(SystemConstant.SUCCESS);
         return ResultUtil.ok(SystemConstant.SUCCESS);
     }
     }

+ 41 - 0
themis-backend/src/main/java/com/qmth/themis/backend/config/WebSocketConfig.java

@@ -0,0 +1,41 @@
+package com.qmth.themis.backend.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+/**
+ * @Description: 开启WebSocket支持
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/6/3
+ */
+@Configuration
+@EnableWebSocket
+public class WebSocketConfig
+//        implements WebSocketConfigurer
+{
+    /**
+     * ServerEndpointExporter 作用
+     * 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
+     *
+     * @return
+     */
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter() {
+        return new ServerEndpointExporter();
+    }
+
+//    @Override
+//    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+//        registry.addHandler(myHandler(), "/messageHandler").setAllowedOrigins("*");
+//    }
+//
+//    @Bean
+//    public WebSocketHandler myHandler() {
+//        return new MessageHandler();
+//    }
+}
+

+ 213 - 0
themis-backend/src/main/java/com/qmth/themis/backend/websocket/WebSocketServer.java

@@ -0,0 +1,213 @@
+package com.qmth.themis.backend.websocket;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @Description: websocker服务端
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/10
+ */
+@ServerEndpoint("/imserver/{userId}")
+@Component
+public class WebSocketServer
+//        implements MessageListenerConcurrently
+{
+    private final static Logger log = LoggerFactory.getLogger(WebSocketServer.class);
+    /**
+     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
+     */
+    private static int onlineCount = 0;
+    /**
+     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
+     */
+    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
+    /**
+     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
+     */
+    private Session session;
+    /**
+     * 接收userId
+     */
+    private String userId = null;
+
+    /**
+     * 连接建立成功调用的方法
+     */
+    @OnOpen
+    public void onOpen(Session session, @PathParam("userId") String userId) {
+        this.session = session;
+        this.userId = userId;
+        if (webSocketMap.containsKey(userId)) {
+            webSocketMap.remove(userId);
+            webSocketMap.put(userId, this);
+            //加入set中
+        } else {
+            webSocketMap.put(userId, this);
+            //加入set中
+            addOnlineCount();
+            //在线数加1
+        }
+        log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
+        try {
+            sendMessage("连接成功");
+        } catch (IOException e) {
+            log.error("用户:" + userId + ",网络异常!!!!!!");
+        }
+    }
+
+    /**
+     * 连接关闭调用的方法
+     */
+    @OnClose
+    public void onClose() {
+        if (webSocketMap.containsKey(userId)) {
+            webSocketMap.remove(userId);
+            //从set中删除
+            subOnlineCount();
+        }
+        log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
+    }
+
+    /**
+     * 收到客户端消息后调用的方法
+     *
+     * @param message 客户端发送过来的消息
+     */
+    @OnMessage
+    public void onMessage(String message, Session session) {
+        log.info("用户消息:" + userId + ",报文:" + message);
+        //可以群发消息
+        //消息保存到数据库、redis
+        if (StringUtils.isNotBlank(message)) {
+            try {
+                //解析发送的报文
+                JSONObject jsonObject = JSON.parseObject(message);
+                //追加发送人(防止串改)
+                jsonObject.put("fromUserId", this.userId);
+                String toUserId = jsonObject.getString("toUserId");
+                //传送给对应toUserId用户的websocket
+                if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) {
+                    webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
+                } else {
+                    log.error("请求的userId:" + toUserId + "不在该服务器上");
+                    //否则不在这个服务器上,发送到mysql或者redis
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * @param session
+     * @param error
+     */
+    @OnError
+    public void onError(Session session, Throwable error) {
+        log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
+        error.printStackTrace();
+    }
+
+    /**
+     * 实现服务器主动推送
+     */
+    public void sendMessage(String message) throws IOException {
+        this.session.getBasicRemote().sendText(message);
+    }
+
+
+    /**
+     * 发送自定义消息
+     */
+    public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
+        log.info("发送消息到:" + userId + ",报文:" + message);
+        if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
+            webSocketMap.get(userId).sendMessage(message);
+        } else {
+            log.error("用户" + userId + ",不在线!");
+        }
+    }
+
+    public void wxappPhotoReady(){
+        log.info("wxappPhotoReady is come in");
+    }
+
+    public static synchronized int getOnlineCount() {
+        return onlineCount;
+    }
+
+    public static synchronized void addOnlineCount() {
+        WebSocketServer.onlineCount++;
+    }
+
+    public static synchronized void subOnlineCount() {
+        WebSocketServer.onlineCount--;
+    }
+
+//    @Override
+//    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+//        try {
+//            long threadId = Thread.currentThread().getId();
+//            String threadName = Thread.currentThread().getName();
+//            for (MessageExt messageExt : msgs) {
+//                log.info(":{}-:{} websocketConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
+//                MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
+//                log.info(":{}-:{} websocketConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
+//                log.info(":{}-:{} websocketConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
+//                Map map = mqDto.getProperties();
+//                String body = JacksonUtil.parseJson(mqDto.getBody());
+//                log.info("map:{},body:{}", JacksonUtil.parseJson(map), body);
+//                String model = String.valueOf(map.get("model"));
+//                MessageModel messageModel = MessageModel.valueOf(model);
+//                if (messageModel.ordinal() == MessageModel.CLUSTERING.ordinal()) {
+//                    webSocketMap.get(map.get("toUserId")).sendMessage(body);
+//                } else {
+//                    webSocketMap.forEach((k, v) -> {
+//                        try {
+//                            v.sendMessage(body);
+//                        } catch (IOException e) {
+//                            e.printStackTrace();
+//                        }
+//                    });
+//                }
+//            }
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+//        }
+//        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+//    }
+//
+//    @Service
+//    @RocketMQMessageListener(consumerGroup = "websocketConsumerImGroup", topic = "websocketImTopic", selectorType = SelectorType.TAG, selectorExpression = "*")
+//    public class sessionConsumerWeb implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
+//
+//        @Override
+//        public void onMessage(Message message) {
+//            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
+//        }
+//
+//        @Override
+//        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
+//            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
+//            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+//            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
+////            defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
+//            defaultMQPushConsumer.registerMessageListener(WebSocketServer.this::consumeMessage);
+//        }
+//    }
+}
+

+ 2 - 2
themis-backend/src/main/resources/application.properties

@@ -190,5 +190,5 @@ mq.config.userLogConsumerStudentGroup=${mq.config.userLogConsumerGroup}-${mq.con
 prefix.url.admin=api/admin
 prefix.url.admin=api/admin
 
 
 #\u65E0\u9700\u9274\u6743\u7684url
 #\u65E0\u9700\u9274\u6743\u7684url
-no.auth.urls=/webjars/**,/druid/**,/swagger-ui.html,/doc.html,/swagger-resources/**,/v2/api-docs,/webjars/springfox-swagger-ui/**,/api/admin/user/login/account
-common.system.urls=/api/admin/sys/getMenu,/api/admin/user/logout,/api/admin/sys/env,/api/admin/user/list,/api/admin/user/es/list
+no.auth.urls=/webjars/**,/druid/**,/swagger-ui.html,/doc.html,/swagger-resources/**,/v2/api-docs,/webjars/springfox-swagger-ui/**,/api/admin/user/login/account,/html/**
+common.system.urls=/api/admin/sys/getMenu,/api/admin/user/logout,/api/admin/sys/env,/api/admin/user/list,/api/admin/user/es/list,/api/admin/user/websocketPush

+ 8 - 0
themis-mq/src/main/java/com/qmth/themis/mq/service/MqDtoService.java

@@ -19,4 +19,12 @@ public interface MqDtoService {
      * @return
      * @return
      */
      */
     public MqDto assembleSendOneWayMsg(Object... o);
     public MqDto assembleSendOneWayMsg(Object... o);
+
+    /**
+     * 组装单向消息
+     *
+     * @param mqDto
+     * @return
+     */
+    public MqDto assembleSendOneWayMsg(MqDto mqDto);
 }
 }

+ 20 - 0
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqDtoServiceImpl.java

@@ -75,4 +75,24 @@ public class MqDtoServiceImpl implements MqDtoService {
         }
         }
         return mqDto;
         return mqDto;
     }
     }
+
+    /**
+     * 组装单向消息
+     *
+     * @param mqDto
+     * @return
+     */
+    @Override
+    public MqDto assembleSendOneWayMsg(MqDto mqDto) {
+        mqDto.setAck(SystemConstant.DELIVERED_ACK_TYPE);
+        try {
+            producerServer.sendOneWay(mqDto);
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (Objects.nonNull(mqDto)) {
+                mqDto.setAck(SystemConstant.UNSEND_ACK_TYPE);
+            }
+        }
+        return null;
+    }
 }
 }