|
@@ -1,17 +1,5 @@
|
|
package com.qmth.themis.mq.templete.impl;
|
|
package com.qmth.themis.mq.templete.impl;
|
|
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.Map;
|
|
|
|
-import java.util.Objects;
|
|
|
|
-
|
|
|
|
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
|
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
|
-import org.apache.rocketmq.common.message.MessageExt;
|
|
|
|
-import org.slf4j.Logger;
|
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
|
-import org.springframework.stereotype.Service;
|
|
|
|
-import org.springframework.transaction.annotation.Transactional;
|
|
|
|
-
|
|
|
|
import com.google.gson.Gson;
|
|
import com.google.gson.Gson;
|
|
import com.qmth.themis.business.constant.SpringContextHolder;
|
|
import com.qmth.themis.business.constant.SpringContextHolder;
|
|
import com.qmth.themis.business.constant.SystemConstant;
|
|
import com.qmth.themis.business.constant.SystemConstant;
|
|
@@ -21,73 +9,89 @@ import com.qmth.themis.business.service.TOeExamRecordService;
|
|
import com.qmth.themis.business.util.JacksonUtil;
|
|
import com.qmth.themis.business.util.JacksonUtil;
|
|
import com.qmth.themis.business.util.RedisUtil;
|
|
import com.qmth.themis.business.util.RedisUtil;
|
|
import com.qmth.themis.common.contanst.Constants;
|
|
import com.qmth.themis.common.contanst.Constants;
|
|
|
|
+import com.qmth.themis.common.exception.BusinessException;
|
|
import com.qmth.themis.mq.dto.MqDto;
|
|
import com.qmth.themis.mq.dto.MqDto;
|
|
import com.qmth.themis.mq.templete.Concurrently;
|
|
import com.qmth.themis.mq.templete.Concurrently;
|
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
|
+import org.apache.rocketmq.common.message.MessageExt;
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
+import org.springframework.transaction.annotation.Transactional;
|
|
|
|
+
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.Objects;
|
|
|
|
|
|
/**
|
|
/**
|
|
* 计算客观分
|
|
* 计算客观分
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @Description:
|
|
* @Description:
|
|
* @Author: xiatian
|
|
* @Author: xiatian
|
|
* @Date: 2020-07-30
|
|
* @Date: 2020-07-30
|
|
*/
|
|
*/
|
|
@Service
|
|
@Service
|
|
public class CalculateObjectiveScoreConcurrentlyImpl implements Concurrently {
|
|
public class CalculateObjectiveScoreConcurrentlyImpl implements Concurrently {
|
|
- private final static Logger log = LoggerFactory.getLogger(CalculateObjectiveScoreConcurrentlyImpl.class);
|
|
|
|
|
|
+ private final static Logger log = LoggerFactory.getLogger(CalculateObjectiveScoreConcurrentlyImpl.class);
|
|
|
|
|
|
- private RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
|
- private TOeExamRecordService examRecordService = SpringContextHolder.getBean(TOeExamRecordService.class);
|
|
|
|
- private TMRocketMessageService tmRocketMessageService = SpringContextHolder.getBean(TMRocketMessageService.class);
|
|
|
|
- @Override
|
|
|
|
- @Transactional
|
|
|
|
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
|
|
|
|
- ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
|
-
|
|
|
|
- MqDto mqDto = null;
|
|
|
|
- try {
|
|
|
|
- long threadId = Thread.currentThread().getId();
|
|
|
|
- String threadName = Thread.currentThread().getName();
|
|
|
|
- Gson gson = new Gson();
|
|
|
|
- for (MessageExt messageExt : msgs) {
|
|
|
|
- log.debug(":{}-:{} CalculateObjectiveScore 重试次数:{}", threadId, threadName,
|
|
|
|
- messageExt.getReconsumeTimes());
|
|
|
|
- mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
|
- log.debug(":{}-:{} CalculateObjectiveScore 接收到的消息:{}", threadId, threadName,
|
|
|
|
- JacksonUtil.parseJson(mqDto));
|
|
|
|
- int reconsumeTime = messageExt.getReconsumeTimes();
|
|
|
|
- if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
|
|
|
|
- //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
|
|
|
|
|
|
+ @Override
|
|
|
|
+ @Transactional
|
|
|
|
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
|
|
|
|
+ ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
|
+ RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
|
+ TOeExamRecordService examRecordService = SpringContextHolder.getBean(TOeExamRecordService.class);
|
|
|
|
+ TMRocketMessageService tmRocketMessageService = SpringContextHolder.getBean(TMRocketMessageService.class);
|
|
|
|
+ MqDto mqDto = null;
|
|
|
|
+ try {
|
|
|
|
+ long threadId = Thread.currentThread().getId();
|
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
|
+ Gson gson = new Gson();
|
|
|
|
+ for (MessageExt messageExt : msgs) {
|
|
|
|
+ log.debug(":{}-:{} CalculateObjectiveScore 重试次数:{}", threadId, threadName,
|
|
|
|
+ messageExt.getReconsumeTimes());
|
|
|
|
+ mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
|
+ log.debug(":{}-:{} CalculateObjectiveScore 接收到的消息:{}", threadId, threadName,
|
|
|
|
+ JacksonUtil.parseJson(mqDto));
|
|
|
|
+ int reconsumeTime = messageExt.getReconsumeTimes();
|
|
|
|
+ if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
|
|
|
|
+ //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
|
|
mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
|
|
mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
|
|
TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
- } else {
|
|
|
|
- if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
|
|
|
|
- && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId()))
|
|
|
|
- && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(),
|
|
|
|
- SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
|
- log.debug(":{}-:{} 更新db", threadId, threadName);
|
|
|
|
- Map<String, Object> param = (Map<String, Object>) mqDto.getBody();
|
|
|
|
- examRecordService.calculateObjectiveScore(param);
|
|
|
|
- mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
|
- TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
|
- tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
|
|
|
|
- tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
|
- redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
|
- } else {
|
|
|
|
- log.debug(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
|
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
|
|
|
|
- } finally {
|
|
|
|
- if (Objects.nonNull(mqDto)) {
|
|
|
|
- redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
|
|
|
|
- }
|
|
|
|
|
|
+ } else {
|
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
|
|
|
|
+ && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId()))
|
|
|
|
+ && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(),
|
|
|
|
+ SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
|
+ log.debug(":{}-:{} 更新db", threadId, threadName);
|
|
|
|
+ Map<String, Object> param = (Map<String, Object>) mqDto.getBody();
|
|
|
|
+ examRecordService.calculateObjectiveScore(param);
|
|
|
|
+ mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
|
+ TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
|
+ tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
|
|
|
|
+ tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
|
+ redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
|
+ } else {
|
|
|
|
+ log.debug(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ if (e instanceof BusinessException) {
|
|
|
|
+ throw new BusinessException(e.getMessage());
|
|
|
|
+ } else {
|
|
|
|
+ throw new RuntimeException(e);
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ if (Objects.nonNull(mqDto)) {
|
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
|
|
|
|
+ }
|
|
}
|
|
}
|