wangliang 4 years ago
parent
commit
0860c15e1f

+ 6 - 6
themis-admin/src/main/java/com/qmth/themis/admin/websocket/WebSocketAdminServer.java

@@ -37,7 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
 @ServerEndpoint("/ws/admin")
 @Component
 public class WebSocketAdminServer
-//        implements Concurrently
+//        implements Orderly
 {
     private final static Logger log = LoggerFactory.getLogger(WebSocketAdminServer.class);
     public volatile static ConcurrentHashMap<Long, WebSocketAdminServer> webSocketMap = new ConcurrentHashMap<>();
@@ -216,7 +216,7 @@ public class WebSocketAdminServer
     }
 
 //    @Override
-//    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+//    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
 //        RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
 //        MqAdminLogicService mqAdminLogicService = SpringContextHolder.getBean(MqAdminLogicService.class);
 //        MqDto mqDto = null;
@@ -233,23 +233,23 @@ public class WebSocketAdminServer
 //                } else {
 //                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
 //                        mqAdminLogicService.execMqAdminLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
-//                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+//                        return ConsumeOrderlyStatus.SUCCESS;
 //                    } else {
 //                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-//                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+//                        return ConsumeConcurrentlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
 //                    }
 //                }
 //            }
 //        } catch (Exception e) {
 //            log.error("mq websocket admin,消息消费出错", e);
 //            e.printStackTrace();
-//            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+//            return ConsumeOrderlyStatus.RECONSUME_LATER;//重试
 //        } finally {
 //            if (Objects.nonNull(mqDto)) {
 //                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
 //            }
 //        }
-//        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+//        return ConsumeOrderlyStatus.SUCCESS;//成功
 //    }
 
     public static ConcurrentHashMap<Long, WebSocketAdminServer> getWebSocketMap() {

+ 9 - 9
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketMobileServer.java

@@ -18,9 +18,9 @@ import com.qmth.themis.common.enums.Platform;
 import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.exam.listener.service.MqOeLogicService;
 import com.qmth.themis.exam.websocketTemplete.WebSocketMobileMessageTemplete;
-import com.qmth.themis.mq.templete.Concurrently;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import com.qmth.themis.mq.templete.Orderly;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +46,7 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 @ServerEndpoint("/ws/mobile")
 @Component
-public class WebSocketMobileServer implements Concurrently {
+public class WebSocketMobileServer implements Orderly {
     private final static Logger log = LoggerFactory.getLogger(WebSocketMobileServer.class);
     private volatile static ConcurrentHashMap<String, WebSocketMobileServer> webSocketMap = new ConcurrentHashMap<>();
     /**
@@ -249,7 +249,7 @@ public class WebSocketMobileServer implements Concurrently {
     }
 
     @Override
-    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
         MqOeLogicService mqOeLogicService = SpringContextHolder.getBean(MqOeLogicService.class);
         try {
             long threadId = Thread.currentThread().getId();
@@ -259,12 +259,12 @@ public class WebSocketMobileServer implements Concurrently {
                 MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                 log.info(":{}-:{} websocket oe Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
                 mqOeLogicService.execMqOeMobileLogic(mqDto, SystemConstant.MQ_BROADCAST_TOPIC_BUFFER_LIST);
-                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                return ConsumeOrderlyStatus.SUCCESS;
             }
         } catch (Exception e) {
-            log.error("mq websocket oe,消息消费出错", e);
-            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+            log.error("mq websocket mobile,消息消费出错", e);
+            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
         }
-        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+        return ConsumeOrderlyStatus.SUCCESS;//成功
     }
 }

+ 8 - 9
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketOeServer.java

@@ -19,9 +19,9 @@ import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.exam.config.ExamConstant;
 import com.qmth.themis.exam.listener.service.MqOeLogicService;
 import com.qmth.themis.exam.websocketTemplete.WebSocketOeMessageTemplete;
-import com.qmth.themis.mq.templete.Concurrently;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import com.qmth.themis.mq.templete.Orderly;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +47,7 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 @ServerEndpoint("/ws/oe")
 @Component
-public class WebSocketOeServer implements Concurrently {
+public class WebSocketOeServer implements Orderly {
     private final static Logger log = LoggerFactory.getLogger(WebSocketOeServer.class);
     private volatile static ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = new ConcurrentHashMap<>();
     /**
@@ -264,8 +264,7 @@ public class WebSocketOeServer implements Concurrently {
     }
 
     @Override
-    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext
-            consumeConcurrentlyContext) {
+    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
         MqOeLogicService mqOeLogicService = SpringContextHolder.getBean(MqOeLogicService.class);
         try {
             long threadId = Thread.currentThread().getId();
@@ -275,13 +274,13 @@ public class WebSocketOeServer implements Concurrently {
                 MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                 log.info(":{}-:{} websocket oe Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
                 mqOeLogicService.execMqOeLogic(mqDto, SystemConstant.MQ_BROADCAST_TOPIC_BUFFER_LIST);
-                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                return ConsumeOrderlyStatus.SUCCESS;
             }
         } catch (Exception e) {
             log.error("mq websocket oe,消息消费出错", e);
-            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
         }
-        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+        return ConsumeOrderlyStatus.SUCCESS;//成功
     }
 
     public static ConcurrentHashMap<String, WebSocketOeServer> getWebSocketMap() {

+ 9 - 9
themis-task/src/main/java/com/qmth/themis/task/listener/QuartzOrderlyImpl.java

@@ -6,10 +6,10 @@ import com.qmth.themis.business.dto.MqDto;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.contanst.Constants;
-import com.qmth.themis.mq.templete.Concurrently;
+import com.qmth.themis.mq.templete.Orderly;
 import com.qmth.themis.task.listener.service.MqTaskLogicService;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,11 +26,11 @@ import java.util.Objects;
  * @Date: 2020/7/31
  */
 @Service
-public class QuartzOrderlyImpl implements Concurrently {
+public class QuartzOrderlyImpl implements Orderly {
     private final static Logger log = LoggerFactory.getLogger(QuartzOrderlyImpl.class);
 
     @Override
-    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
         RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
         MqTaskLogicService mqTaskLogicService = SpringContextHolder.getBean(MqTaskLogicService.class);
         MqDto mqDto = null;
@@ -48,7 +48,7 @@ public class QuartzOrderlyImpl implements Concurrently {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                         try {
                             mqTaskLogicService.execMqQuartzLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
-                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                            return ConsumeOrderlyStatus.SUCCESS;
                         } finally {
                             if (Objects.nonNull(mqDto)) {
                                 redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
@@ -59,8 +59,8 @@ public class QuartzOrderlyImpl implements Concurrently {
             }
         } catch (Exception e) {
             log.error("mq quartz顺序,消息消费出错", e);
-            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
         }
-        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+        return ConsumeOrderlyStatus.SUCCESS;//成功
     }
-}
+}