|
@@ -6,6 +6,8 @@ import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.messaging.Message;
|
|
|
+import org.springframework.messaging.support.MessageBuilder;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
@@ -18,18 +20,27 @@ public class ExamReserveMQProducer implements MQConstants {
|
|
|
@Resource
|
|
|
private RocketMQTemplate rocketMQTemplate;
|
|
|
|
|
|
+ @Value("${rocketmq.producer.send-message-timeout:3000}")
|
|
|
+ private Integer mqSendMessageTimeout;
|
|
|
+
|
|
|
@Value("${rocketmq.topic}")
|
|
|
private String mqTopic;
|
|
|
|
|
|
- public void sendMessage(ApplyRecordCacheBean message) {
|
|
|
+ public void sendMessage(ApplyRecordCacheBean bean) {
|
|
|
// 按考点ID分区队列(同个考点下消息按顺序执行)
|
|
|
- String hashKey = String.valueOf(message.getExamSiteId());
|
|
|
+ String hashKey = String.valueOf(bean.getExamSiteId());
|
|
|
|
|
|
+ // 主题:分类
|
|
|
String destination = mqTopic + ":" + TAG_STUDENT_APPLY;
|
|
|
- SendResult result = rocketMQTemplate.syncSendOrderly(destination, message, hashKey);
|
|
|
+
|
|
|
+ Message<ApplyRecordCacheBean> message = MessageBuilder.withPayload(bean).build();
|
|
|
+ // MQ消息延迟级别3 = 10秒
|
|
|
+ int delayLevel = 3;
|
|
|
+
|
|
|
+ SendResult result = rocketMQTemplate.syncSendOrderly(destination, message, hashKey, mqSendMessageTimeout, delayLevel);
|
|
|
log.info("【考生预约队列】消息推送!sendStatus:{} msgId:{} queueOffset:{} {}_{}_{}_{}",
|
|
|
- result.getSendStatus(), result.getMsgId(), result.getQueueOffset(), message.getStudentId(),
|
|
|
- message.getExamSiteId(), message.getTimePeriodId(), message.getCancel());
|
|
|
+ result.getSendStatus(), result.getMsgId(), result.getQueueOffset(), bean.getStudentId(),
|
|
|
+ bean.getExamSiteId(), bean.getTimePeriodId(), bean.getCancel());
|
|
|
}
|
|
|
|
|
|
}
|