|
@@ -19,9 +19,9 @@ import com.qmth.themis.common.exception.BusinessException;
|
|
import com.qmth.themis.exam.config.ExamConstant;
|
|
import com.qmth.themis.exam.config.ExamConstant;
|
|
import com.qmth.themis.exam.listener.service.MqOeLogicService;
|
|
import com.qmth.themis.exam.listener.service.MqOeLogicService;
|
|
import com.qmth.themis.exam.websocketTemplete.WebSocketOeMessageTemplete;
|
|
import com.qmth.themis.exam.websocketTemplete.WebSocketOeMessageTemplete;
|
|
-import com.qmth.themis.mq.templete.Orderly;
|
|
|
|
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
|
|
|
|
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
|
|
|
|
|
+import com.qmth.themis.mq.templete.Concurrently;
|
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -47,7 +47,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
*/
|
|
*/
|
|
@ServerEndpoint("/ws/oe")
|
|
@ServerEndpoint("/ws/oe")
|
|
@Component
|
|
@Component
|
|
-public class WebSocketOeServer implements Orderly {
|
|
|
|
|
|
+public class WebSocketOeServer implements Concurrently {
|
|
private final static Logger log = LoggerFactory.getLogger(WebSocketOeServer.class);
|
|
private final static Logger log = LoggerFactory.getLogger(WebSocketOeServer.class);
|
|
private volatile static ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = new ConcurrentHashMap<>();
|
|
private volatile static ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = new ConcurrentHashMap<>();
|
|
/**
|
|
/**
|
|
@@ -264,7 +264,8 @@ public class WebSocketOeServer implements Orderly {
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
|
|
|
|
|
|
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext
|
|
|
|
+ consumeConcurrentlyContext) {
|
|
MqOeLogicService mqOeLogicService = SpringContextHolder.getBean(MqOeLogicService.class);
|
|
MqOeLogicService mqOeLogicService = SpringContextHolder.getBean(MqOeLogicService.class);
|
|
try {
|
|
try {
|
|
long threadId = Thread.currentThread().getId();
|
|
long threadId = Thread.currentThread().getId();
|
|
@@ -274,13 +275,13 @@ public class WebSocketOeServer implements Orderly {
|
|
MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
log.info(":{}-:{} websocket oe Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
log.info(":{}-:{} websocket oe Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
mqOeLogicService.execMqOeLogic(mqDto, SystemConstant.MQ_BROADCAST_TOPIC_BUFFER_LIST);
|
|
mqOeLogicService.execMqOeLogic(mqDto, SystemConstant.MQ_BROADCAST_TOPIC_BUFFER_LIST);
|
|
- return ConsumeOrderlyStatus.SUCCESS;
|
|
|
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
log.error("mq websocket oe,消息消费出错", e);
|
|
log.error("mq websocket oe,消息消费出错", e);
|
|
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
|
|
|
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
}
|
|
}
|
|
- return ConsumeOrderlyStatus.SUCCESS;//成功
|
|
|
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
}
|
|
}
|
|
|
|
|
|
public static ConcurrentHashMap<String, WebSocketOeServer> getWebSocketMap() {
|
|
public static ConcurrentHashMap<String, WebSocketOeServer> getWebSocketMap() {
|
|
@@ -290,5 +291,4 @@ public class WebSocketOeServer implements Orderly {
|
|
public Long getRecordId() {
|
|
public Long getRecordId() {
|
|
return recordId;
|
|
return recordId;
|
|
}
|
|
}
|
|
-}
|
|
|
|
-
|
|
|
|
|
|
+}
|