|
@@ -25,170 +25,175 @@ import cn.com.qmth.examcloud.reports.commons.bean.OnlineStudentReport;
|
|
|
import cn.com.qmth.examcloud.reports.commons.bean.OnlineUserReport;
|
|
|
import cn.com.qmth.examcloud.reports.commons.bean.OperateReport;
|
|
|
import cn.com.qmth.examcloud.reports.commons.enums.Tag;
|
|
|
-import cn.com.qmth.examcloud.reports.commons.enums.Topic;
|
|
|
+import cn.com.qmth.examcloud.reports.commons.util.ReportsUtil;
|
|
|
import cn.com.qmth.examcloud.web.bootstrap.PropertyHolder;
|
|
|
import cn.com.qmth.examcloud.web.support.SpringContextHolder;
|
|
|
|
|
|
public class RocketMqConsumerListener {
|
|
|
- private final static Logger LOG = LoggerFactory.getLogger(RocketMqConsumerListener.class);
|
|
|
-
|
|
|
- private static OperateService operateService = SpringContextHolder.getBean(OperateService.class);
|
|
|
-
|
|
|
- private static Properties properties = new Properties();
|
|
|
- static {
|
|
|
- properties.put(PropertyKeyConst.AccessKey, PropertyHolder.getString("$rocketmq-accesskey"));
|
|
|
- // AccessKeySecret 阿里云身份验证,在阿里云服务器管理控制台创建。
|
|
|
- properties.put(PropertyKeyConst.SecretKey, PropertyHolder.getString("$rocketmq-secretkey"));
|
|
|
- // 设置发送超时时间,单位毫秒。
|
|
|
- properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
|
|
|
- // 设置 TCP 接入域名,进入控制台的实例详情页面的 TCP 协议客户端接入点区域查看。
|
|
|
- properties.put(PropertyKeyConst.NAMESRV_ADDR, PropertyHolder.getString("$rocketmq-namesrv-addr"));
|
|
|
- // 顺序消息消费失败进行重试前的等待时间,单位(毫秒),取值范围: 10 毫秒 ~ 30,000 毫秒
|
|
|
- properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
|
|
|
- // 消息消费失败时的最大重试次数
|
|
|
- properties.put(PropertyKeyConst.MaxReconsumeTimes, "10");
|
|
|
- }
|
|
|
-
|
|
|
- public static void start() {
|
|
|
- onlineExamStudent();
|
|
|
- onlineStudent();
|
|
|
- onlineUser();
|
|
|
- operateInfo();
|
|
|
- }
|
|
|
-
|
|
|
- private static void onlineExamStudent() {
|
|
|
- properties.put(PropertyKeyConst.GROUP_ID, Tag.ONLINE_EXAM_STUDENT.getGroup());
|
|
|
- Consumer consumer = ONSFactory.createConsumer(properties);
|
|
|
- consumer.subscribe(Topic.REPORT_TOPIC.getCode(), Tag.ONLINE_EXAM_STUDENT.getCode(), new MessageListener() {
|
|
|
- @Override
|
|
|
- public Action consume(Message message, ConsumeContext context) {
|
|
|
- try {
|
|
|
- String msg = new String(message.getBody(), "utf-8");
|
|
|
- onMessageExamStudent(msg);
|
|
|
- return Action.CommitMessage;
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("consumer failed MsgID:" + message.getMsgID(), e);
|
|
|
- return Action.ReconsumeLater;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- });
|
|
|
-
|
|
|
- consumer.start();
|
|
|
- }
|
|
|
-
|
|
|
- private static void onlineStudent() {
|
|
|
- properties.put(PropertyKeyConst.GROUP_ID, Tag.ONLINE_STUDENT.getGroup());
|
|
|
- Consumer consumer = ONSFactory.createConsumer(properties);
|
|
|
- consumer.subscribe(Topic.REPORT_TOPIC.getCode(), Tag.ONLINE_STUDENT.getCode(), new MessageListener() {
|
|
|
- @Override
|
|
|
- public Action consume(Message message, ConsumeContext context) {
|
|
|
- try {
|
|
|
- String msg = new String(message.getBody(), "utf-8");
|
|
|
- onMessageStudent(msg);
|
|
|
- return Action.CommitMessage;
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("consumer failed MsgID:" + message.getMsgID(), e);
|
|
|
- return Action.ReconsumeLater;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- });
|
|
|
-
|
|
|
- consumer.start();
|
|
|
- }
|
|
|
-
|
|
|
- private static void onlineUser() {
|
|
|
- properties.put(PropertyKeyConst.GROUP_ID, Tag.ONLINE_USER.getGroup());
|
|
|
- Consumer consumer = ONSFactory.createConsumer(properties);
|
|
|
- consumer.subscribe(Topic.REPORT_TOPIC.getCode(), Tag.ONLINE_USER.getCode(), new MessageListener() {
|
|
|
- @Override
|
|
|
- public Action consume(Message message, ConsumeContext context) {
|
|
|
- try {
|
|
|
- String msg = new String(message.getBody(), "utf-8");
|
|
|
- onMessageUser(msg);
|
|
|
- return Action.CommitMessage;
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("consumer failed MsgID:" + message.getMsgID(), e);
|
|
|
- return Action.ReconsumeLater;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- });
|
|
|
-
|
|
|
- consumer.start();
|
|
|
- }
|
|
|
-
|
|
|
- private static void operateInfo() {
|
|
|
- properties.put(PropertyKeyConst.GROUP_ID, Tag.OPERATE_INFO.getGroup());
|
|
|
- Consumer consumer = ONSFactory.createConsumer(properties);
|
|
|
- consumer.subscribe(Topic.REPORT_TOPIC.getCode(), Tag.OPERATE_INFO.getCode(), new MessageListener() {
|
|
|
- @Override
|
|
|
- public Action consume(Message message, ConsumeContext context) {
|
|
|
- try {
|
|
|
- String msg = new String(message.getBody(), "utf-8");
|
|
|
- onOperateInfo(msg, message.getMsgID());
|
|
|
- return Action.CommitMessage;
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("consumer failed MsgID:" + message.getMsgID(), e);
|
|
|
- return Action.ReconsumeLater;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- });
|
|
|
-
|
|
|
- consumer.start();
|
|
|
- }
|
|
|
-
|
|
|
- private static void onMessageStudent(String message) {
|
|
|
- OnlineStudentReport r = JSON.parseObject(message, OnlineStudentReport.class);
|
|
|
- StudentActive ac = new StudentActive();
|
|
|
- ac.setActiveTime(r.getReportTime().getTime());
|
|
|
- ac.setRootOrgId(r.getRootOrgId());
|
|
|
- ac.setStudentId(r.getStudentId());
|
|
|
- ActiveDataUtil.updateStudentActive(ac);
|
|
|
- onMessageLoginStudent(r.getReportTime().getTime(), r.getRootOrgId(), r.getStudentId());
|
|
|
- }
|
|
|
-
|
|
|
- private static void onMessageExamStudent(String message) {
|
|
|
- OnlineExamStudentReport r = JSON.parseObject(message, OnlineExamStudentReport.class);
|
|
|
- ExamStudentActive ac = new ExamStudentActive();
|
|
|
- ac.setActiveTime(r.getReportTime().getTime());
|
|
|
- ac.setRootOrgId(r.getRootOrgId());
|
|
|
- ac.setStudentId(r.getStudentId());
|
|
|
- ac.setExamId(r.getExamId());
|
|
|
- ActiveDataUtil.updateExamStudentActive(ac);
|
|
|
- onMessageStudent(r.getReportTime().getTime(), r.getRootOrgId(), r.getStudentId());
|
|
|
- onMessageLoginStudent(r.getReportTime().getTime(), r.getRootOrgId(), r.getStudentId());
|
|
|
- }
|
|
|
-
|
|
|
- private static void onMessageUser(String message) {
|
|
|
- OnlineUserReport r = JSON.parseObject(message, OnlineUserReport.class);
|
|
|
- UserActive ac = new UserActive();
|
|
|
- ac.setActiveTime(r.getReportTime().getTime());
|
|
|
- ac.setRootOrgId(r.getRootOrgId());
|
|
|
- ac.setUserId(r.getUserId());
|
|
|
- ActiveDataUtil.updateUserActive(ac);
|
|
|
- }
|
|
|
-
|
|
|
- private static void onMessageLoginStudent(Long reportTime, Long rootOrgId, Long studentId) {
|
|
|
- StudentLogin sl = new StudentLogin();
|
|
|
- sl.setActiveTime(reportTime);
|
|
|
- sl.setRootOrgId(rootOrgId);
|
|
|
- sl.setStudentId(studentId);
|
|
|
- ActiveDataUtil.updateStudentlogin(sl);
|
|
|
- }
|
|
|
-
|
|
|
- private static void onMessageStudent(Long reportTime, Long rootOrgId, Long studentId) {
|
|
|
- StudentActive ac = new StudentActive();
|
|
|
- ac.setActiveTime(reportTime);
|
|
|
- ac.setRootOrgId(rootOrgId);
|
|
|
- ac.setStudentId(studentId);
|
|
|
- ActiveDataUtil.updateStudentActive(ac);
|
|
|
- }
|
|
|
-
|
|
|
- private static void onOperateInfo(String message, String msgId) {
|
|
|
- OperateReport r = JSON.parseObject(message, OperateReport.class);
|
|
|
- operateService.saveOperate(r,msgId);
|
|
|
- }
|
|
|
+
|
|
|
+ private final static Logger LOG = LoggerFactory.getLogger(RocketMqConsumerListener.class);
|
|
|
+
|
|
|
+ private static OperateService operateService = SpringContextHolder.getBean(OperateService.class);
|
|
|
+
|
|
|
+ private static Properties properties = new Properties();
|
|
|
+ static {
|
|
|
+ properties.put(PropertyKeyConst.AccessKey, PropertyHolder.getString("$rocketmq-accesskey"));
|
|
|
+ // AccessKeySecret 阿里云身份验证,在阿里云服务器管理控制台创建。
|
|
|
+ properties.put(PropertyKeyConst.SecretKey, PropertyHolder.getString("$rocketmq-secretkey"));
|
|
|
+ // 设置发送超时时间,单位毫秒。
|
|
|
+ properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
|
|
|
+ // 设置 TCP 接入域名,进入控制台的实例详情页面的 TCP 协议客户端接入点区域查看。
|
|
|
+ properties.put(PropertyKeyConst.NAMESRV_ADDR, PropertyHolder.getString("$rocketmq-namesrv-addr"));
|
|
|
+ // 顺序消息消费失败进行重试前的等待时间,单位(毫秒),取值范围: 10 毫秒 ~ 30,000 毫秒
|
|
|
+ properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
|
|
|
+ // 消息消费失败时的最大重试次数
|
|
|
+ properties.put(PropertyKeyConst.MaxReconsumeTimes, "10");
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void start() {
|
|
|
+ onlineExamStudent();
|
|
|
+ onlineStudent();
|
|
|
+ onlineUser();
|
|
|
+ operateInfo();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void onlineExamStudent() {
|
|
|
+ properties.put(PropertyKeyConst.GROUP_ID, Tag.ONLINE_EXAM_STUDENT.getGroup());
|
|
|
+ Consumer consumer = ONSFactory.createConsumer(properties);
|
|
|
+ consumer.subscribe(ReportsUtil.getReportTopic(), Tag.ONLINE_EXAM_STUDENT.getCode(), new MessageListener() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Action consume(Message message, ConsumeContext context) {
|
|
|
+ try {
|
|
|
+ String msg = new String(message.getBody(), "utf-8");
|
|
|
+ onMessageExamStudent(msg);
|
|
|
+ return Action.CommitMessage;
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("consumer failed MsgID:" + message.getMsgID(), e);
|
|
|
+ return Action.ReconsumeLater;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ });
|
|
|
+
|
|
|
+ consumer.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void onlineStudent() {
|
|
|
+ properties.put(PropertyKeyConst.GROUP_ID, Tag.ONLINE_STUDENT.getGroup());
|
|
|
+ Consumer consumer = ONSFactory.createConsumer(properties);
|
|
|
+ consumer.subscribe(ReportsUtil.getReportTopic(), Tag.ONLINE_STUDENT.getCode(), new MessageListener() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Action consume(Message message, ConsumeContext context) {
|
|
|
+ try {
|
|
|
+ String msg = new String(message.getBody(), "utf-8");
|
|
|
+ onMessageStudent(msg);
|
|
|
+ return Action.CommitMessage;
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("consumer failed MsgID:" + message.getMsgID(), e);
|
|
|
+ return Action.ReconsumeLater;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ });
|
|
|
+
|
|
|
+ consumer.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void onlineUser() {
|
|
|
+ properties.put(PropertyKeyConst.GROUP_ID, Tag.ONLINE_USER.getGroup());
|
|
|
+ Consumer consumer = ONSFactory.createConsumer(properties);
|
|
|
+ consumer.subscribe(ReportsUtil.getReportTopic(), Tag.ONLINE_USER.getCode(), new MessageListener() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Action consume(Message message, ConsumeContext context) {
|
|
|
+ try {
|
|
|
+ String msg = new String(message.getBody(), "utf-8");
|
|
|
+ onMessageUser(msg);
|
|
|
+ return Action.CommitMessage;
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("consumer failed MsgID:" + message.getMsgID(), e);
|
|
|
+ return Action.ReconsumeLater;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ });
|
|
|
+
|
|
|
+ consumer.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void operateInfo() {
|
|
|
+ properties.put(PropertyKeyConst.GROUP_ID, Tag.OPERATE_INFO.getGroup());
|
|
|
+ Consumer consumer = ONSFactory.createConsumer(properties);
|
|
|
+ consumer.subscribe(ReportsUtil.getReportTopic(), Tag.OPERATE_INFO.getCode(), new MessageListener() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Action consume(Message message, ConsumeContext context) {
|
|
|
+ try {
|
|
|
+ String msg = new String(message.getBody(), "utf-8");
|
|
|
+ onOperateInfo(msg, message.getMsgID());
|
|
|
+ return Action.CommitMessage;
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("consumer failed MsgID:" + message.getMsgID(), e);
|
|
|
+ return Action.ReconsumeLater;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ });
|
|
|
+
|
|
|
+ consumer.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void onMessageStudent(String message) {
|
|
|
+ OnlineStudentReport r = JSON.parseObject(message, OnlineStudentReport.class);
|
|
|
+ StudentActive ac = new StudentActive();
|
|
|
+ ac.setActiveTime(r.getReportTime().getTime());
|
|
|
+ ac.setRootOrgId(r.getRootOrgId());
|
|
|
+ ac.setStudentId(r.getStudentId());
|
|
|
+ ActiveDataUtil.updateStudentActive(ac);
|
|
|
+ onMessageLoginStudent(r.getReportTime().getTime(), r.getRootOrgId(), r.getStudentId());
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void onMessageExamStudent(String message) {
|
|
|
+ OnlineExamStudentReport r = JSON.parseObject(message, OnlineExamStudentReport.class);
|
|
|
+ ExamStudentActive ac = new ExamStudentActive();
|
|
|
+ ac.setActiveTime(r.getReportTime().getTime());
|
|
|
+ ac.setRootOrgId(r.getRootOrgId());
|
|
|
+ ac.setStudentId(r.getStudentId());
|
|
|
+ ac.setExamId(r.getExamId());
|
|
|
+ ActiveDataUtil.updateExamStudentActive(ac);
|
|
|
+ onMessageStudent(r.getReportTime().getTime(), r.getRootOrgId(), r.getStudentId());
|
|
|
+ onMessageLoginStudent(r.getReportTime().getTime(), r.getRootOrgId(), r.getStudentId());
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void onMessageUser(String message) {
|
|
|
+ OnlineUserReport r = JSON.parseObject(message, OnlineUserReport.class);
|
|
|
+ UserActive ac = new UserActive();
|
|
|
+ ac.setActiveTime(r.getReportTime().getTime());
|
|
|
+ ac.setRootOrgId(r.getRootOrgId());
|
|
|
+ ac.setUserId(r.getUserId());
|
|
|
+ ActiveDataUtil.updateUserActive(ac);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void onMessageLoginStudent(Long reportTime, Long rootOrgId, Long studentId) {
|
|
|
+ StudentLogin sl = new StudentLogin();
|
|
|
+ sl.setActiveTime(reportTime);
|
|
|
+ sl.setRootOrgId(rootOrgId);
|
|
|
+ sl.setStudentId(studentId);
|
|
|
+ ActiveDataUtil.updateStudentlogin(sl);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void onMessageStudent(Long reportTime, Long rootOrgId, Long studentId) {
|
|
|
+ StudentActive ac = new StudentActive();
|
|
|
+ ac.setActiveTime(reportTime);
|
|
|
+ ac.setRootOrgId(rootOrgId);
|
|
|
+ ac.setStudentId(studentId);
|
|
|
+ ActiveDataUtil.updateStudentActive(ac);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void onOperateInfo(String message, String msgId) {
|
|
|
+ OperateReport r = JSON.parseObject(message, OperateReport.class);
|
|
|
+ operateService.saveOperate(r, msgId);
|
|
|
+ }
|
|
|
}
|