|
@@ -15,6 +15,7 @@ import com.qmth.themis.common.contanst.Constants;
|
|
|
import com.qmth.themis.common.exception.BusinessException;
|
|
|
import com.qmth.themis.mq.dto.MqDto;
|
|
|
import com.qmth.themis.mq.templete.Concurrently;
|
|
|
+import com.qmth.themis.mq.templete.ExecMqLogic;
|
|
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
@@ -36,44 +37,34 @@ import java.util.Objects;
|
|
|
@Service
|
|
|
public class UserLogConcurrentlyImpl implements Concurrently {
|
|
|
private final static Logger log = LoggerFactory.getLogger(UserLogConcurrentlyImpl.class);
|
|
|
+ private RedisUtil redisUtil = null;
|
|
|
+ private TEUserLogService teUserLogService = null;
|
|
|
+ private TEExamStudentLogService teExamStudentLogService = null;
|
|
|
+ private TMRocketMessageService tmRocketMessageService = null;
|
|
|
+ private Gson gson = null;
|
|
|
+ private MqDto mqDto = null;
|
|
|
|
|
|
@Override
|
|
|
- @Transactional
|
|
|
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
- RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
- TEUserLogService teUserLogService = SpringContextHolder.getBean(TEUserLogService.class);
|
|
|
- TEExamStudentLogService teExamStudentLogService = SpringContextHolder.getBean(TEExamStudentLogService.class);
|
|
|
- TMRocketMessageService tmRocketMessageService = SpringContextHolder.getBean(TMRocketMessageService.class);
|
|
|
- MqDto mqDto = null;
|
|
|
+ redisUtil = SpringContextHolder.getBean(RedisUtil.class);
|
|
|
+ teUserLogService = SpringContextHolder.getBean(TEUserLogService.class);
|
|
|
+ teExamStudentLogService = SpringContextHolder.getBean(TEExamStudentLogService.class);
|
|
|
+ tmRocketMessageService = SpringContextHolder.getBean(TMRocketMessageService.class);
|
|
|
try {
|
|
|
long threadId = Thread.currentThread().getId();
|
|
|
String threadName = Thread.currentThread().getName();
|
|
|
- Gson gson = new Gson();
|
|
|
+ gson = new Gson();
|
|
|
for (MessageExt messageExt : msgs) {
|
|
|
mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
log.info(":{}-:{} logConsumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
-// MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
|
|
|
log.info(":{}-:{} logConsumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
int reconsumeTime = messageExt.getReconsumeTimes();
|
|
|
if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
|
|
|
- //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
|
|
|
- mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
|
|
|
- TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
- tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
- redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
+ this.execMqMaxReconsumeTime(SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
} else {
|
|
|
if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
log.info(":{}-:{} 插入用户轨迹日志", threadId, threadName);
|
|
|
- String tag = mqDto.getTag();
|
|
|
- if (tag.contains("user")) {
|
|
|
- teUserLogService.saveUserLogInfo(mqDto.getTimestamp(), mqDto.getObjId(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
|
|
|
- } else if (tag.contains("student")) {
|
|
|
- teExamStudentLogService.saveStudentLogInfo(mqDto.getTimestamp(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
|
|
|
- }
|
|
|
- mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
- TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
- tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
- redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
+ this.execMqLogic();
|
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
} else {
|
|
|
log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
@@ -95,4 +86,28 @@ public class UserLogConcurrentlyImpl implements Concurrently {
|
|
|
}
|
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
}
|
|
|
+
|
|
|
+ @Transactional
|
|
|
+ public void execMqMaxReconsumeTime(String key) {
|
|
|
+ //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
|
|
|
+ mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
|
|
|
+ TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
+ tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
+ redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Transactional
|
|
|
+ public void execMqLogic() {
|
|
|
+ String tag = mqDto.getTag();
|
|
|
+ if (tag.contains("user")) {
|
|
|
+ teUserLogService.saveUserLogInfo(mqDto.getTimestamp(), mqDto.getObjId(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
|
|
|
+ } else if (tag.contains("student")) {
|
|
|
+ teExamStudentLogService.saveStudentLogInfo(mqDto.getTimestamp(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
|
|
|
+ }
|
|
|
+ mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
+ TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
+ tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
+ redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
+ int i = 1/0;
|
|
|
+ }
|
|
|
}
|