|
@@ -6,12 +6,15 @@ import com.qmth.themis.business.constant.SpringContextHolder;
|
|
import com.qmth.themis.business.constant.SystemConstant;
|
|
import com.qmth.themis.business.constant.SystemConstant;
|
|
import com.qmth.themis.business.dto.WebsocketDto;
|
|
import com.qmth.themis.business.dto.WebsocketDto;
|
|
import com.qmth.themis.business.entity.TBSession;
|
|
import com.qmth.themis.business.entity.TBSession;
|
|
|
|
+import com.qmth.themis.business.entity.TMRocketMessage;
|
|
import com.qmth.themis.business.enums.MqEnum;
|
|
import com.qmth.themis.business.enums.MqEnum;
|
|
import com.qmth.themis.business.enums.SystemOperationEnum;
|
|
import com.qmth.themis.business.enums.SystemOperationEnum;
|
|
import com.qmth.themis.business.enums.WebsocketTypeEnum;
|
|
import com.qmth.themis.business.enums.WebsocketTypeEnum;
|
|
|
|
+import com.qmth.themis.business.service.TMRocketMessageService;
|
|
import com.qmth.themis.business.util.JacksonUtil;
|
|
import com.qmth.themis.business.util.JacksonUtil;
|
|
import com.qmth.themis.business.util.RedisUtil;
|
|
import com.qmth.themis.business.util.RedisUtil;
|
|
import com.qmth.themis.business.util.WebsocketUtil;
|
|
import com.qmth.themis.business.util.WebsocketUtil;
|
|
|
|
+import com.qmth.themis.common.contanst.Constants;
|
|
import com.qmth.themis.common.enums.ExceptionResultEnum;
|
|
import com.qmth.themis.common.enums.ExceptionResultEnum;
|
|
import com.qmth.themis.common.exception.BusinessException;
|
|
import com.qmth.themis.common.exception.BusinessException;
|
|
import com.qmth.themis.common.signature.SignatureInfo;
|
|
import com.qmth.themis.common.signature.SignatureInfo;
|
|
@@ -21,9 +24,22 @@ import com.qmth.themis.mq.dto.MqDto;
|
|
import com.qmth.themis.mq.enums.MqTagEnum;
|
|
import com.qmth.themis.mq.enums.MqTagEnum;
|
|
import com.qmth.themis.mq.enums.MqTopicEnum;
|
|
import com.qmth.themis.mq.enums.MqTopicEnum;
|
|
import com.qmth.themis.mq.service.MqDtoService;
|
|
import com.qmth.themis.mq.service.MqDtoService;
|
|
|
|
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
|
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
|
|
|
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
|
|
|
+import org.apache.rocketmq.common.message.Message;
|
|
|
|
+import org.apache.rocketmq.common.message.MessageExt;
|
|
|
|
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
|
|
|
|
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
|
+import org.apache.rocketmq.spring.annotation.SelectorType;
|
|
|
|
+import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
|
|
+import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
|
|
import javax.websocket.*;
|
|
import javax.websocket.*;
|
|
import javax.websocket.server.PathParam;
|
|
import javax.websocket.server.PathParam;
|
|
@@ -46,8 +62,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
@ServerEndpoint("/oe")
|
|
@ServerEndpoint("/oe")
|
|
@Component
|
|
@Component
|
|
public class WebSocketOeServer
|
|
public class WebSocketOeServer
|
|
-// implements MessageListenerConcurrently
|
|
|
|
-{
|
|
|
|
|
|
+ implements MessageListenerConcurrently {
|
|
private final static Logger log = LoggerFactory.getLogger(WebSocketOeServer.class);
|
|
private final static Logger log = LoggerFactory.getLogger(WebSocketOeServer.class);
|
|
private volatile static ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = new ConcurrentHashMap<>();
|
|
private volatile static ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = new ConcurrentHashMap<>();
|
|
/**
|
|
/**
|
|
@@ -109,7 +124,6 @@ public class WebSocketOeServer
|
|
addOnlineCount();
|
|
addOnlineCount();
|
|
//在线数加1
|
|
//在线数加1
|
|
}
|
|
}
|
|
- redisUtil.delete(SystemConstant.WEBSOCKET_UN_NORMAL_LIST, this.sessionId);
|
|
|
|
//发送恢复网络mq消息
|
|
//发送恢复网络mq消息
|
|
log.info("用户连接:" + this.sessionId + ",当前在线人数为:" + getOnlineCount());
|
|
log.info("用户连接:" + this.sessionId + ",当前在线人数为:" + getOnlineCount());
|
|
try {
|
|
try {
|
|
@@ -149,7 +163,6 @@ public class WebSocketOeServer
|
|
//大于等于超时时间,说明规定时间内都没有通信,非正常退出,因为期间会有心跳更新updateTime
|
|
//大于等于超时时间,说明规定时间内都没有通信,非正常退出,因为期间会有心跳更新updateTime
|
|
if ((now.getTime() - this.updateTime) / 1000 >= SystemConstant.WEBSOCKET_MAX_TIME_OUT / 1000) {
|
|
if ((now.getTime() - this.updateTime) / 1000 >= SystemConstant.WEBSOCKET_MAX_TIME_OUT / 1000) {
|
|
log.info("超时退出");
|
|
log.info("超时退出");
|
|
- redisUtil.set(SystemConstant.WEBSOCKET_UN_NORMAL_LIST, this.sessionId, this.sessionId);
|
|
|
|
//发送延时mq消息start
|
|
//发送延时mq消息start
|
|
MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
|
|
MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
|
|
String level = "2m";
|
|
String level = "2m";
|
|
@@ -158,7 +171,7 @@ public class WebSocketOeServer
|
|
dt = dt.plusMinutes(Long.parseLong(level.replace("m", "")));
|
|
dt = dt.plusMinutes(Long.parseLong(level.replace("m", "")));
|
|
tranMap.put("timeOut", time);
|
|
tranMap.put("timeOut", time);
|
|
tranMap.put("mqExecTime", dt.toInstant(ZoneOffset.of("+8")).toEpochMilli());
|
|
tranMap.put("mqExecTime", dt.toInstant(ZoneOffset.of("+8")).toEpochMilli());
|
|
- MqDto mqDto = new MqDto(MqTopicEnum.websocketUnNormalTopic.getCode(), MqTagEnum.oe.name(), SystemOperationEnum.OE_NET_UN_NORMAL, MqEnum.WEBSOCKET_UN_NORMAL_LOG, String.valueOf(this.recordId), this.tranMap, this.sessionId);
|
|
|
|
|
|
+ MqDto mqDto = new MqDto(MqTopicEnum.websocketTopic.getCode(), MqTagEnum.unNormal.name(), SystemOperationEnum.OE_NET_UN_NORMAL, MqEnum.WEBSOCKET_UN_NORMAL_LOG, String.valueOf(this.recordId), this.tranMap, this.sessionId);
|
|
mqDtoService.assembleSendAsyncDelayMsg(mqDto);
|
|
mqDtoService.assembleSendAsyncDelayMsg(mqDto);
|
|
//发送延时mq消息end
|
|
//发送延时mq消息end
|
|
}
|
|
}
|
|
@@ -275,12 +288,42 @@ public class WebSocketOeServer
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-// @Override
|
|
|
|
-// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
|
-// try {
|
|
|
|
-// long threadId = Thread.currentThread().getId();
|
|
|
|
-// String threadName = Thread.currentThread().getName();
|
|
|
|
-// for (MessageExt messageExt : msgs) {
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
|
+ RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
|
+ TMRocketMessageService tmRocketMessageService = SpringContextHolder.getBean(TMRocketMessageService.class);
|
|
|
|
+ MqDto mqDto = null;
|
|
|
|
+ try {
|
|
|
|
+ long threadId = Thread.currentThread().getId();
|
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
|
+ Gson gson = new Gson();
|
|
|
|
+ for (MessageExt messageExt : msgs) {
|
|
|
|
+ log.info(":{}-:{} websocket oe Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
|
+ mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
|
+ log.info(":{}-:{} websocket oe Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
|
+ int reconsumeTime = messageExt.getReconsumeTimes();
|
|
|
|
+ if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
|
|
|
|
+ //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
|
|
|
|
+ mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
|
|
|
|
+ TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
|
+ tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
|
+ redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
|
+ } else {
|
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
|
+ String body = new String(messageExt.getBody(), Constants.CHARSET_NAME);
|
|
|
|
+ log.info("body:{}", body);
|
|
|
|
+ JSONObject jsonObject = JSONObject.parseObject(body);
|
|
|
|
+ Map properties = (Map) jsonObject.get("properties");
|
|
|
|
+ String oper = String.valueOf(properties.get("oper"));
|
|
|
|
+ if (oper.contains("offLine")) {//下线
|
|
|
|
+
|
|
|
|
+ } else if (oper.contains(MessageModel.CLUSTERING.name())) {//点对点消息
|
|
|
|
+
|
|
|
|
+ } else if (oper.contains(MessageModel.BROADCASTING.name())) {//广播消息
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
// log.info(":{}-:{} websocketConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
// log.info(":{}-:{} websocketConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
// MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
// MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
// log.info(":{}-:{} websocketConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
// log.info(":{}-:{} websocketConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
@@ -301,31 +344,37 @@ public class WebSocketOeServer
|
|
// }
|
|
// }
|
|
// });
|
|
// });
|
|
// }
|
|
// }
|
|
-// }
|
|
|
|
-// } catch (Exception e) {
|
|
|
|
-// e.printStackTrace();
|
|
|
|
-// return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
|
-// }
|
|
|
|
-// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
|
-// }
|
|
|
|
-//
|
|
|
|
-// @Service
|
|
|
|
-// @RocketMQMessageListener(consumerGroup = "websocketConsumerImGroup", topic = "websocketImTopic", selectorType = SelectorType.TAG, selectorExpression = "*")
|
|
|
|
-// public class sessionConsumerWeb implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
|
-//
|
|
|
|
-// @Override
|
|
|
|
-// public void onMessage(Message message) {
|
|
|
|
-// //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
|
-// }
|
|
|
|
-//
|
|
|
|
-// @Override
|
|
|
|
-// public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
|
-// defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
|
-// defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
|
-// defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
|
-//// defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
|
|
|
|
-// defaultMQPushConsumer.registerMessageListener(WebSocketServer.this::consumeMessage);
|
|
|
|
-// }
|
|
|
|
-// }
|
|
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
|
+ } finally {
|
|
|
|
+ if (Objects.nonNull(mqDto)) {
|
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * websocket oe
|
|
|
|
+ */
|
|
|
|
+ @Service
|
|
|
|
+ @RocketMQMessageListener(consumerGroup = "${mq.config.websocketConsumerOeGroup}", topic = "${mq.config.websocketTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.websocketTopicOeTag}")
|
|
|
|
+ public class taskConsumerQuartzStudent implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void onMessage(Message message) {
|
|
|
|
+ //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
|
+ defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
|
+ defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
|
+ defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
|
+ defaultMQPushConsumer.registerMessageListener(WebSocketOeServer.this::consumeMessage);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|