|
@@ -7,9 +7,12 @@ import com.qmth.themis.business.util.JacksonUtil;
|
|
|
import com.qmth.themis.business.util.RedisUtil;
|
|
|
import com.qmth.themis.common.contanst.Constants;
|
|
|
import com.qmth.themis.mq.templete.Concurrently;
|
|
|
+import com.qmth.themis.mq.templete.Orderly;
|
|
|
import com.qmth.themis.task.listener.service.MqTaskLogicService;
|
|
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -26,11 +29,11 @@ import java.util.Objects;
|
|
|
* @Date: 2020/7/31
|
|
|
*/
|
|
|
@Service
|
|
|
-public class QuartzOrderlyImpl implements Concurrently {
|
|
|
+public class QuartzOrderlyImpl implements Orderly {
|
|
|
private final static Logger log = LoggerFactory.getLogger(QuartzOrderlyImpl.class);
|
|
|
|
|
|
@Override
|
|
|
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
+ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderContext) {
|
|
|
RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
MqTaskLogicService mqTaskLogicService = SpringContextHolder.getBean(MqTaskLogicService.class);
|
|
|
MqDto mqDto = null;
|
|
@@ -48,7 +51,7 @@ public class QuartzOrderlyImpl implements Concurrently {
|
|
|
if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
try {
|
|
|
mqTaskLogicService.execMqQuartzLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
+ return ConsumeOrderlyStatus.SUCCESS;
|
|
|
} finally {
|
|
|
if (Objects.nonNull(mqDto)) {
|
|
|
redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
@@ -59,8 +62,8 @@ public class QuartzOrderlyImpl implements Concurrently {
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
log.error("mq quartz顺序,消息消费出错", e);
|
|
|
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+ return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
|
|
|
}
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
+ return ConsumeOrderlyStatus.SUCCESS;//成功
|
|
|
}
|
|
|
}
|