فهرست منبع

延时消息错误fix

xiatian 4 سال پیش
والد
کامیت
a057b72359

+ 1 - 1
themis-business/src/main/java/com/qmth/themis/business/service/impl/CommonServiceImpl.java

@@ -60,7 +60,7 @@ public class CommonServiceImpl implements CommonService {
 				//算分未完成的 发送5秒延迟消息
 				Map<String, Object> transMap = new HashMap<String, Object>();
 				transMap.put("recordId", recordId);
-				String level = "5s";
+				String level = "10s";
 				Integer time = SystemConstant.mqDelayLevel.get(level);
 				LocalDateTime dt = LocalDateTime.now();
 				dt = dt.plusSeconds(Long.parseLong(level.replace("s", "")));

+ 2 - 2
themis-business/src/main/java/com/qmth/themis/business/service/impl/ProducerServerImpl.java

@@ -138,8 +138,8 @@ public class ProducerServerImpl implements ProducerServer {
     @Override
     public Result asyncDelayMsg(MqDto mqDto) {
         log.info("asyncDelayMsg mqDto:{}", JacksonUtil.parseJson(mqDto));
-        Message message = assembleMessage(mqDto);
-        org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(message).build();
+//        Message message = assembleMessage(mqDto);
+        org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(mqDto).build();
         rocketMQTemplate.asyncSend(mqDto.getTopic() + ":" + mqDto.getTag(), messageTran, new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {

+ 14 - 6
themis-business/src/main/java/com/qmth/themis/business/service/impl/TEExamServiceImpl.java

@@ -4,6 +4,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.math.BigDecimal;
 import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -624,7 +626,6 @@ public class TEExamServiceImpl extends ServiceImpl<TEExamMapper, TEExam> impleme
         ret.setStructUrl(OssUtil.getUrlForPrivateBucket(systemConfig.getOssEnv(3), ep.getStructPath()));
         ret.setHasAudio((ep.getHasAudio() != null && ep.getHasAudio().intValue() == 1 ? true : false));
         ret.setAudioPlayCount(ep.getAudioPlayCount());
-        // TODO 9527
         ret.setMonitorKey(ExamRecordCacheUtil.getMonitorKey(recordId));
         ret.setMonitorUserId("s_" + tbSession.getId());
         ret.setMonitorUserSig(tencentYunUtil.getSign(ret.getMonitorUserId(), SystemConstant.TENCENT_EXPIRE_TIME));
@@ -784,11 +785,18 @@ public class TEExamServiceImpl extends ServiceImpl<TEExamMapper, TEExam> impleme
         }
 
         Map<String, Object> transMap = new HashMap<String, Object>();
-        transMap.put("recordId", recordId);
-        // mq发送消息start
-        MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.EXAM_RECORD_PERSISTED.name(), transMap,
-                MqTagEnum.EXAM_RECORD_PERSISTED, recordId.toString(), recordId.toString());
-        mqDtoService.assembleSendOneWayMsg(mqDto);
+		transMap.put("recordId", recordId);
+		String level = "10s";
+		Integer time = SystemConstant.mqDelayLevel.get(level);
+		LocalDateTime dt = LocalDateTime.now();
+		dt = dt.plusSeconds(Long.parseLong(level.replace("s", "")));
+		Map<String, Object> propMap = new HashMap<String, Object>();
+		propMap.put("timeOut", time);
+		propMap.put("mqExecTime", dt.toInstant(ZoneOffset.of("+8")).toEpochMilli());
+		MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.EXAM_RECORD_PERSISTED.name(),
+				transMap, MqTagEnum.EXAM_RECORD_PERSISTED, recordId.toString(), propMap, recordId.toString());
+
+		mqDtoService.assembleSendAsyncDelayMsg(mqDto);
     }
 
     @Cacheable(value = "exam", key = "#examId", unless = "#result == null")