|
@@ -1,24 +1,6 @@
|
|
|
package cn.com.qmth.examcloud.core.reports.api.listener;
|
|
|
|
|
|
-import java.util.Properties;
|
|
|
-
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
-
|
|
|
-import com.alibaba.fastjson.JSON;
|
|
|
-import com.aliyun.openservices.ons.api.Action;
|
|
|
-import com.aliyun.openservices.ons.api.ConsumeContext;
|
|
|
-import com.aliyun.openservices.ons.api.Consumer;
|
|
|
-import com.aliyun.openservices.ons.api.Message;
|
|
|
-import com.aliyun.openservices.ons.api.MessageListener;
|
|
|
-import com.aliyun.openservices.ons.api.ONSFactory;
|
|
|
-import com.aliyun.openservices.ons.api.PropertyKeyConst;
|
|
|
-
|
|
|
-import cn.com.qmth.examcloud.core.reports.base.util.online.ActiveDataUtil;
|
|
|
-import cn.com.qmth.examcloud.core.reports.base.util.online.ExamStudentActive;
|
|
|
-import cn.com.qmth.examcloud.core.reports.base.util.online.StudentActive;
|
|
|
-import cn.com.qmth.examcloud.core.reports.base.util.online.StudentLogin;
|
|
|
-import cn.com.qmth.examcloud.core.reports.base.util.online.UserActive;
|
|
|
+import cn.com.qmth.examcloud.core.reports.base.util.online.*;
|
|
|
import cn.com.qmth.examcloud.core.reports.service.OperateService;
|
|
|
import cn.com.qmth.examcloud.reports.commons.bean.OnlineExamStudentReport;
|
|
|
import cn.com.qmth.examcloud.reports.commons.bean.OnlineStudentReport;
|
|
@@ -28,6 +10,12 @@ import cn.com.qmth.examcloud.reports.commons.enums.Tag;
|
|
|
import cn.com.qmth.examcloud.reports.commons.util.ReportsUtil;
|
|
|
import cn.com.qmth.examcloud.web.bootstrap.PropertyHolder;
|
|
|
import cn.com.qmth.examcloud.web.support.SpringContextHolder;
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.aliyun.openservices.ons.api.*;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import java.util.Properties;
|
|
|
|
|
|
public class RocketMqConsumerListener {
|
|
|
|
|
@@ -36,14 +24,15 @@ public class RocketMqConsumerListener {
|
|
|
private static OperateService operateService = SpringContextHolder.getBean(OperateService.class);
|
|
|
|
|
|
private static Properties properties = new Properties();
|
|
|
+
|
|
|
static {
|
|
|
- properties.put(PropertyKeyConst.AccessKey, PropertyHolder.getString("$rocketmq-accesskey"));
|
|
|
+ properties.put(PropertyKeyConst.AccessKey, PropertyHolder.getString("examcloud.rocketmq.accesskey"));
|
|
|
// AccessKeySecret 阿里云身份验证,在阿里云服务器管理控制台创建。
|
|
|
- properties.put(PropertyKeyConst.SecretKey, PropertyHolder.getString("$rocketmq-secretkey"));
|
|
|
+ properties.put(PropertyKeyConst.SecretKey, PropertyHolder.getString("examcloud.rocketmq.secretkey"));
|
|
|
// 设置发送超时时间,单位毫秒。
|
|
|
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
|
|
|
// 设置 TCP 接入域名,进入控制台的实例详情页面的 TCP 协议客户端接入点区域查看。
|
|
|
- properties.put(PropertyKeyConst.NAMESRV_ADDR, PropertyHolder.getString("$rocketmq-namesrv-addr"));
|
|
|
+ properties.put(PropertyKeyConst.NAMESRV_ADDR, PropertyHolder.getString("examcloud.rocketmq.namesrv_addr"));
|
|
|
// 顺序消息消费失败进行重试前的等待时间,单位(毫秒),取值范围: 10 毫秒 ~ 30,000 毫秒
|
|
|
properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
|
|
|
// 消息消费失败时的最大重试次数
|
|
@@ -58,9 +47,9 @@ public class RocketMqConsumerListener {
|
|
|
}
|
|
|
|
|
|
private static void onlineExamStudent() {
|
|
|
- properties.put(PropertyKeyConst.GROUP_ID, Tag.ONLINE_EXAM_STUDENT.getGroup());
|
|
|
+ properties.put(PropertyKeyConst.GROUP_ID, ReportsUtil.curConsumerGroup(Tag.ONLINE_EXAM_STUDENT));
|
|
|
Consumer consumer = ONSFactory.createConsumer(properties);
|
|
|
- consumer.subscribe(ReportsUtil.getReportTopic(), Tag.ONLINE_EXAM_STUDENT.getCode(), new MessageListener() {
|
|
|
+ consumer.subscribe(ReportsUtil.curTopic(Tag.ONLINE_EXAM_STUDENT), Tag.ONLINE_EXAM_STUDENT.name(), new MessageListener() {
|
|
|
|
|
|
@Override
|
|
|
public Action consume(Message message, ConsumeContext context) {
|
|
@@ -73,16 +62,15 @@ public class RocketMqConsumerListener {
|
|
|
return Action.ReconsumeLater;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
});
|
|
|
|
|
|
consumer.start();
|
|
|
}
|
|
|
|
|
|
private static void onlineStudent() {
|
|
|
- properties.put(PropertyKeyConst.GROUP_ID, Tag.ONLINE_STUDENT.getGroup());
|
|
|
+ properties.put(PropertyKeyConst.GROUP_ID, ReportsUtil.curConsumerGroup(Tag.ONLINE_STUDENT));
|
|
|
Consumer consumer = ONSFactory.createConsumer(properties);
|
|
|
- consumer.subscribe(ReportsUtil.getReportTopic(), Tag.ONLINE_STUDENT.getCode(), new MessageListener() {
|
|
|
+ consumer.subscribe(ReportsUtil.curTopic(Tag.ONLINE_STUDENT), Tag.ONLINE_STUDENT.name(), new MessageListener() {
|
|
|
|
|
|
@Override
|
|
|
public Action consume(Message message, ConsumeContext context) {
|
|
@@ -95,16 +83,15 @@ public class RocketMqConsumerListener {
|
|
|
return Action.ReconsumeLater;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
});
|
|
|
|
|
|
consumer.start();
|
|
|
}
|
|
|
|
|
|
private static void onlineUser() {
|
|
|
- properties.put(PropertyKeyConst.GROUP_ID, Tag.ONLINE_USER.getGroup());
|
|
|
+ properties.put(PropertyKeyConst.GROUP_ID, ReportsUtil.curConsumerGroup(Tag.ONLINE_USER));
|
|
|
Consumer consumer = ONSFactory.createConsumer(properties);
|
|
|
- consumer.subscribe(ReportsUtil.getReportTopic(), Tag.ONLINE_USER.getCode(), new MessageListener() {
|
|
|
+ consumer.subscribe(ReportsUtil.curTopic(Tag.ONLINE_USER), Tag.ONLINE_USER.name(), new MessageListener() {
|
|
|
|
|
|
@Override
|
|
|
public Action consume(Message message, ConsumeContext context) {
|
|
@@ -117,16 +104,15 @@ public class RocketMqConsumerListener {
|
|
|
return Action.ReconsumeLater;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
});
|
|
|
|
|
|
consumer.start();
|
|
|
}
|
|
|
|
|
|
private static void operateInfo() {
|
|
|
- properties.put(PropertyKeyConst.GROUP_ID, Tag.OPERATE_INFO.getGroup());
|
|
|
+ properties.put(PropertyKeyConst.GROUP_ID, ReportsUtil.curConsumerGroup(Tag.OPERATE_INFO));
|
|
|
Consumer consumer = ONSFactory.createConsumer(properties);
|
|
|
- consumer.subscribe(ReportsUtil.getReportTopic(), Tag.OPERATE_INFO.getCode(), new MessageListener() {
|
|
|
+ consumer.subscribe(ReportsUtil.curTopic(Tag.OPERATE_INFO), Tag.OPERATE_INFO.name(), new MessageListener() {
|
|
|
|
|
|
@Override
|
|
|
public Action consume(Message message, ConsumeContext context) {
|
|
@@ -139,7 +125,6 @@ public class RocketMqConsumerListener {
|
|
|
return Action.ReconsumeLater;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
});
|
|
|
|
|
|
consumer.start();
|
|
@@ -196,4 +181,5 @@ public class RocketMqConsumerListener {
|
|
|
OperateReport r = JSON.parseObject(message, OperateReport.class);
|
|
|
operateService.saveOperate(r, msgId);
|
|
|
}
|
|
|
-}
|
|
|
+
|
|
|
+}
|