xiatian il y a 5 ans
Parent
commit
30daa6c27f

+ 14 - 16
examcloud-core-reports-api-provider/src/main/java/cn/com/qmth/examcloud/core/reports/api/listener/KafkaConsumerListener.java

@@ -18,7 +18,6 @@ import cn.com.qmth.examcloud.core.reports.base.util.online.ExamStudentActive;
 import cn.com.qmth.examcloud.core.reports.base.util.online.StudentActive;
 import cn.com.qmth.examcloud.core.reports.base.util.online.StudentLogin;
 import cn.com.qmth.examcloud.core.reports.base.util.online.UserActive;
-import cn.com.qmth.examcloud.reports.commons.bean.LoginStudentReport;
 import cn.com.qmth.examcloud.reports.commons.bean.OnlineExamStudentReport;
 import cn.com.qmth.examcloud.reports.commons.bean.OnlineStudentReport;
 import cn.com.qmth.examcloud.reports.commons.bean.OnlineUserReport;
@@ -32,10 +31,10 @@ public class KafkaConsumerListener {
 
 	static {
 		props = new Properties();
-		props.put("group.id", PropertyHolder.getString("spring.kafka.consumer.group-id"));
-		props.put("bootstrap.servers", PropertyHolder.getString("spring.kafka.bootstrap-servers"));
-		props.put("key.deserializer", PropertyHolder.getString("spring.kafka.consumer.key-deserializer"));
-		props.put("value.deserializer", PropertyHolder.getString("spring.kafka.consumer.value-deserializer"));
+		props.put("group.id", "online-count-group");
+		props.put("bootstrap.servers", PropertyHolder.getString("$kafka-bootstrap-servers"));
+		props.put("key.deserializer", PropertyHolder.getString("$kafka-key-deserializer"));
+		props.put("value.deserializer", PropertyHolder.getString("$kafka-value-deserializer"));
 	}
 
 	public static void start() {
@@ -44,7 +43,6 @@ public class KafkaConsumerListener {
 		subs.add(Topic.STUDENT.getCode());
 		subs.add(Topic.EXAM_STUDENT.getCode());
 		subs.add(Topic.USER.getCode());
-		subs.add(Topic.STUDENT_LOGIN.getCode());
 		consumer.subscribe(subs);
 		try {
 		   for(;;) {
@@ -60,12 +58,12 @@ public class KafkaConsumerListener {
 		        		onMessageExamStudent(record.value());
 		        	}else if(Topic.USER.getCode().equals(record.topic())){
 		        		onMessageUser(record.value());
-		        	}else if(Topic.STUDENT_LOGIN.getCode().equals(record.topic())){
-		        		onMessageLoginStudent(record.value());
 		        	}
 		        }
 		    }
-		} finally {
+		} catch(Exception e){
+			logger.error("消息消费线程出错",e);
+		}finally {
 		    // 关闭消费者,网络连接和 socket 也会随之关闭,并立即触发一次再均衡
 		    consumer.close();
 		}
@@ -79,6 +77,7 @@ public class KafkaConsumerListener {
 		ac.setRootOrgId(r.getRootOrgId());
 		ac.setStudentId(r.getStudentId());
 		ActiveDataUtil.updateStudentActive(ac);
+		onMessageLoginStudent(r.getReportTime().getTime(), r.getRootOrgId(), r.getStudentId());
 	}
 
 	private static void onMessageExamStudent(String message) {
@@ -87,9 +86,10 @@ public class KafkaConsumerListener {
 		ExamStudentActive ac = new ExamStudentActive();
 		ac.setActiveTime(r.getReportTime().getTime());
 		ac.setRootOrgId(r.getRootOrgId());
-		ac.setExamStudentId(r.getExamStudentId());
+		ac.setStudentId(r.getStudentId());
 		ac.setExamId(r.getExamId());
 		ActiveDataUtil.updateExamStudentActive(ac);
+		onMessageLoginStudent(r.getReportTime().getTime(), r.getRootOrgId(), r.getStudentId());
 	}
 
 	private static void onMessageUser(String message) {
@@ -102,13 +102,11 @@ public class KafkaConsumerListener {
 		ActiveDataUtil.updateUserActive(ac);
 	}
 
-	private static void onMessageLoginStudent(String message) {
-		logger.debug("STUDENT_LOGIN message:" + message);
-		LoginStudentReport r = JSON.parseObject(message, LoginStudentReport.class);
+	private static void onMessageLoginStudent(Long reportTime,Long rootOrgId,Long studentId) {
 		StudentLogin sl = new StudentLogin();
-		sl.setActiveTime(r.getReportTime().getTime());
-		sl.setRootOrgId(r.getRootOrgId());
-		sl.setStudentId(r.getStudentId());
+		sl.setActiveTime(reportTime);
+		sl.setRootOrgId(rootOrgId);
+		sl.setStudentId(studentId);
 		ActiveDataUtil.updateStudentlogin(sl);
 	}
 

+ 1 - 1
examcloud-core-reports-base/src/main/java/cn/com/qmth/examcloud/core/reports/base/util/online/ActiveDataUtil.java

@@ -44,7 +44,7 @@ public class ActiveDataUtil {
 			map = new ConcurrentHashMap<Long, Long>();
 			examStudentActiveData.put(key, map);
 		}
-		map.put(ac.getExamStudentId(), ac.getActiveTime());
+		map.put(ac.getStudentId(), ac.getActiveTime());
 	}
 
 	public static void updateStudentlogin(StudentLogin sl) {

+ 5 - 5
examcloud-core-reports-base/src/main/java/cn/com/qmth/examcloud/core/reports/base/util/online/ExamStudentActive.java

@@ -3,7 +3,7 @@ package cn.com.qmth.examcloud.core.reports.base.util.online;
 public class ExamStudentActive {
 	private Long rootOrgId;
 	private Long examId;
-	private Long examStudentId;
+	private Long studentId;
 	private Long activeTime;
 	public Long getRootOrgId() {
 		return rootOrgId;
@@ -17,11 +17,11 @@ public class ExamStudentActive {
 	public void setExamId(Long examId) {
 		this.examId = examId;
 	}
-	public Long getExamStudentId() {
-		return examStudentId;
+	public Long getStudentId() {
+		return studentId;
 	}
-	public void setExamStudentId(Long examStudentId) {
-		this.examStudentId = examStudentId;
+	public void setStudentId(Long studentId) {
+		this.studentId = studentId;
 	}
 	public Long getActiveTime() {
 		return activeTime;

+ 9 - 3
examcloud-core-reports-starter/src/main/java/cn/com/qmth/examcloud/core/reports/starter/config/MqConsumerListenerStartup.java

@@ -13,10 +13,10 @@ import cn.com.qmth.examcloud.reports.commons.enums.MqType;
 import cn.com.qmth.examcloud.web.bootstrap.PropertyHolder;
 
 @Component
-@Order(99)
+@Order(999)
 public class MqConsumerListenerStartup implements ApplicationRunner {
 	private final static Logger logger = LoggerFactory.getLogger(MqConsumerListenerStartup.class);
-	public void start() {
+	private void startConsumerListener() {
 
 		String mqType = PropertyHolder.getString("$report.mq-type");
 
@@ -36,6 +36,12 @@ public class MqConsumerListenerStartup implements ApplicationRunner {
 
 	@Override
 	public void run(ApplicationArguments args) throws Exception {
-		start();
+		new Thread() {
+			@Override
+			public void run() {
+				startConsumerListener();
+			}
+			
+		}.start();
 	}
 }