xiatian 5 years ago
parent
commit
b612f6381b

+ 79 - 76
examcloud-core-reports-api-provider/src/main/java/cn/com/qmth/examcloud/core/reports/api/listener/KafkaConsumerListener.java

@@ -25,89 +25,92 @@ import cn.com.qmth.examcloud.reports.commons.enums.Topic;
 import cn.com.qmth.examcloud.web.bootstrap.PropertyHolder;
 
 public class KafkaConsumerListener {
-	private final static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);
 
-	private static Properties props;
+    private final static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);
 
-	static {
-		props = new Properties();
-		props.put("group.id", "online-count-group");
-		props.put("bootstrap.servers", PropertyHolder.getString("$kafka-bootstrap-servers"));
-		props.put("key.deserializer", PropertyHolder.getString("$kafka-key-deserializer"));
-		props.put("value.deserializer", PropertyHolder.getString("$kafka-value-deserializer"));
-	}
+    private static Properties props;
 
-	public static void start() {
-		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
-		List<String> subs=Lists.newArrayList();
-		subs.add(Topic.STUDENT.getCode());
-		subs.add(Topic.EXAM_STUDENT.getCode());
-		subs.add(Topic.USER.getCode());
-		consumer.subscribe(subs);
-		try {
-		   for(;;) {
-		        // 100 是超时时间(ms),在该时间内 poll 会等待服务器返回数据
-		        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
+    static {
+        props = new Properties();
+        props.put("group.id", "online-count-group");
+        props.put("bootstrap.servers", PropertyHolder.getString("$kafka-bootstrap-servers"));
+        props.put("key.deserializer", PropertyHolder.getString("$kafka-key-deserializer",
+                "org.apache.kafka.common.serialization.StringDeserializer"));
+        props.put("value.deserializer", PropertyHolder.getString("$kafka-value-deserializer",
+                "org.apache.kafka.common.serialization.StringDeserializer"));
+    }
 
-		        // poll 返回一个记录列表。
-		        // 每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
-		        for (ConsumerRecord<String, String> record : records) {
-		        	if(Topic.STUDENT.getCode().equals(record.topic())) {
-		        		onMessageStudent(record.value());
-		        	}else if(Topic.EXAM_STUDENT.getCode().equals(record.topic())){
-		        		onMessageExamStudent(record.value());
-		        	}else if(Topic.USER.getCode().equals(record.topic())){
-		        		onMessageUser(record.value());
-		        	}
-		        }
-		    }
-		} catch(Exception e){
-			logger.error("消息消费线程出错",e);
-		}finally {
-		    // 关闭消费者,网络连接和 socket 也会随之关闭,并立即触发一次再均衡
-		    consumer.close();
-		}
-	}
+    public static void start() {
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
+        List<String> subs = Lists.newArrayList();
+        subs.add(Topic.STUDENT.getCode());
+        subs.add(Topic.EXAM_STUDENT.getCode());
+        subs.add(Topic.USER.getCode());
+        consumer.subscribe(subs);
+        try {
+            for (;;) {
+                // 100 是超时时间(ms),在该时间内 poll 会等待服务器返回数据
+                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
-	private static void onMessageStudent(String message) {
-		logger.debug("STUDENT message:" + message);
-		OnlineStudentReport r = JSON.parseObject(message, OnlineStudentReport.class);
-		StudentActive ac = new StudentActive();
-		ac.setActiveTime(r.getReportTime().getTime());
-		ac.setRootOrgId(r.getRootOrgId());
-		ac.setStudentId(r.getStudentId());
-		ActiveDataUtil.updateStudentActive(ac);
-		onMessageLoginStudent(r.getReportTime().getTime(), r.getRootOrgId(), r.getStudentId());
-	}
+                // poll 返回一个记录列表。
+                // 每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
+                for (ConsumerRecord<String, String> record : records) {
+                    if (Topic.STUDENT.getCode().equals(record.topic())) {
+                        onMessageStudent(record.value());
+                    } else if (Topic.EXAM_STUDENT.getCode().equals(record.topic())) {
+                        onMessageExamStudent(record.value());
+                    } else if (Topic.USER.getCode().equals(record.topic())) {
+                        onMessageUser(record.value());
+                    }
+                }
+            }
+        } catch (Exception e) {
+            logger.error("消息消费线程出错", e);
+        } finally {
+            // 关闭消费者,网络连接和 socket 也会随之关闭,并立即触发一次再均衡
+            consumer.close();
+        }
+    }
 
-	private static void onMessageExamStudent(String message) {
-		logger.debug("EXAM_STUDENT message:" + message);
-		OnlineExamStudentReport r = JSON.parseObject(message, OnlineExamStudentReport.class);
-		ExamStudentActive ac = new ExamStudentActive();
-		ac.setActiveTime(r.getReportTime().getTime());
-		ac.setRootOrgId(r.getRootOrgId());
-		ac.setStudentId(r.getStudentId());
-		ac.setExamId(r.getExamId());
-		ActiveDataUtil.updateExamStudentActive(ac);
-		onMessageLoginStudent(r.getReportTime().getTime(), r.getRootOrgId(), r.getStudentId());
-	}
+    private static void onMessageStudent(String message) {
+        logger.debug("STUDENT message:" + message);
+        OnlineStudentReport r = JSON.parseObject(message, OnlineStudentReport.class);
+        StudentActive ac = new StudentActive();
+        ac.setActiveTime(r.getReportTime().getTime());
+        ac.setRootOrgId(r.getRootOrgId());
+        ac.setStudentId(r.getStudentId());
+        ActiveDataUtil.updateStudentActive(ac);
+        onMessageLoginStudent(r.getReportTime().getTime(), r.getRootOrgId(), r.getStudentId());
+    }
 
-	private static void onMessageUser(String message) {
-		logger.debug("USER message:" + message);
-		OnlineUserReport r = JSON.parseObject(message, OnlineUserReport.class);
-		UserActive ac = new UserActive();
-		ac.setActiveTime(r.getReportTime().getTime());
-		ac.setRootOrgId(r.getRootOrgId());
-		ac.setUserId(r.getUserId());
-		ActiveDataUtil.updateUserActive(ac);
-	}
+    private static void onMessageExamStudent(String message) {
+        logger.debug("EXAM_STUDENT message:" + message);
+        OnlineExamStudentReport r = JSON.parseObject(message, OnlineExamStudentReport.class);
+        ExamStudentActive ac = new ExamStudentActive();
+        ac.setActiveTime(r.getReportTime().getTime());
+        ac.setRootOrgId(r.getRootOrgId());
+        ac.setStudentId(r.getStudentId());
+        ac.setExamId(r.getExamId());
+        ActiveDataUtil.updateExamStudentActive(ac);
+        onMessageLoginStudent(r.getReportTime().getTime(), r.getRootOrgId(), r.getStudentId());
+    }
 
-	private static void onMessageLoginStudent(Long reportTime,Long rootOrgId,Long studentId) {
-		StudentLogin sl = new StudentLogin();
-		sl.setActiveTime(reportTime);
-		sl.setRootOrgId(rootOrgId);
-		sl.setStudentId(studentId);
-		ActiveDataUtil.updateStudentlogin(sl);
-	}
+    private static void onMessageUser(String message) {
+        logger.debug("USER message:" + message);
+        OnlineUserReport r = JSON.parseObject(message, OnlineUserReport.class);
+        UserActive ac = new UserActive();
+        ac.setActiveTime(r.getReportTime().getTime());
+        ac.setRootOrgId(r.getRootOrgId());
+        ac.setUserId(r.getUserId());
+        ActiveDataUtil.updateUserActive(ac);
+    }
+
+    private static void onMessageLoginStudent(Long reportTime, Long rootOrgId, Long studentId) {
+        StudentLogin sl = new StudentLogin();
+        sl.setActiveTime(reportTime);
+        sl.setRootOrgId(rootOrgId);
+        sl.setStudentId(studentId);
+        ActiveDataUtil.updateStudentlogin(sl);
+    }
 
 }