xiatian 5 gadi atpakaļ
vecāks
revīzija
85346e73f9

+ 19 - 12
src/main/java/cn/com/qmth/examcloud/reports/commons/handler/KafkaSendResultHandler.java

@@ -1,22 +1,29 @@
 package cn.com.qmth.examcloud.reports.commons.handler;
 
-import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.kafka.support.ProducerListener;
 
-@SuppressWarnings("rawtypes")
-public class KafkaSendResultHandler implements ProducerListener {
+public class KafkaSendResultHandler implements Callback {
 	private final static Logger logger = LoggerFactory.getLogger(KafkaSendResultHandler.class);
+	private final String topic;
+    private final String message;
+    
+    
+	public KafkaSendResultHandler(String topic,String message) {
+		super();
+		this.topic = topic;
+		this.message = message;
+	}
 
-    @Override
-    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
-    	logger.debug("kafka message send success : " + producerRecord.toString());
-    }
 
-    @Override
-    public void onError(ProducerRecord producerRecord, Exception exception) {
-    	logger.error("kafka message send error : " + producerRecord.toString(),exception);
-    }
+	@Override
+	public void onCompletion(RecordMetadata metadata, Exception exception) {
+		if(exception==null) {
+			logger.debug("kafka message send success : "+" "+topic+" "+message);
+		}else {
+			logger.error("kafka message send error :"+" "+topic+" "+message,exception);
+		}
+	}
 }

+ 11 - 19
src/main/java/cn/com/qmth/examcloud/reports/commons/util/ReportsUtil.java

@@ -5,23 +5,21 @@ import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
 
 import com.alibaba.fastjson.JSON;
 
 import cn.com.qmth.examcloud.commons.util.ThreadLocalUtil;
 import cn.com.qmth.examcloud.reports.commons.bean.BaseReport;
-import cn.com.qmth.examcloud.reports.commons.bean.OnlineExamStudentReport;
-import cn.com.qmth.examcloud.reports.commons.bean.OnlineStudentReport;
-import cn.com.qmth.examcloud.reports.commons.bean.OnlineUserReport;
 import cn.com.qmth.examcloud.reports.commons.enums.MqType;
 import cn.com.qmth.examcloud.reports.commons.handler.KafkaSendResultHandler;
 import cn.com.qmth.examcloud.web.bootstrap.PropertyHolder;
-import cn.com.qmth.examcloud.web.support.SpringContextHolder;
 
 @SuppressWarnings("unchecked")
 public class ReportsUtil {
@@ -29,17 +27,19 @@ public class ReportsUtil {
 
 	private final static String key = "report";
 
-	private static KafkaTemplate<String, String> kafkaTemplate;
+	private static KafkaProducer<String, String> kafkaProducer;
 
 	private static String mqType = PropertyHolder.getString("$report.mq-type");
 
 	private static Boolean reportEnable = PropertyHolder.getBoolean("$report.enable", false);
-
 	static {
 		if (reportEnable) {
 			if (MqType.KAFKA.getCode().equals(mqType)) {
-				kafkaTemplate = SpringContextHolder.getBean(KafkaTemplate.class);
-				kafkaTemplate.setProducerListener(new KafkaSendResultHandler());
+				Properties props = new Properties();
+				props.put("bootstrap.servers", PropertyHolder.getString("$kafka-bootstrap-servers"));
+				props.put("key.serializer", PropertyHolder.getString("$kafka-key-serializer"));
+				props.put("value.serializer", PropertyHolder.getString("$kafka-value-serializer"));
+				kafkaProducer = new KafkaProducer<String, String>(props);
 			} else if (MqType.ROCKETMQ.getCode().equals(mqType)) {
 				// TODO
 			} else {
@@ -54,16 +54,8 @@ public class ReportsUtil {
 			for (BaseReport b : list) {
 				if (!onException || (onException && b.getReportOnException())) {
 					try {
-						if (b instanceof OnlineExamStudentReport) {
-							OnlineExamStudentReport ob = (OnlineExamStudentReport) b;
-							kafkaTemplate.send(ob.getTopic(), JSON.toJSONString(b));
-						} else if (b instanceof OnlineStudentReport) {
-							OnlineStudentReport ob = (OnlineStudentReport) b;
-							kafkaTemplate.send(ob.getTopic(), JSON.toJSONString(b));
-						} else if (b instanceof OnlineUserReport) {
-							OnlineUserReport ob = (OnlineUserReport) b;
-							kafkaTemplate.send(ob.getTopic(), JSON.toJSONString(b));
-						}
+						String messageStr=JSON.toJSONString(b);
+						kafkaProducer.send(new ProducerRecord<>(b.getTopic(),messageStr), new KafkaSendResultHandler(b.getTopic(), messageStr));
 					} catch (Exception e) {
 						logger.error("SendReport Error:" + JSON.toJSONString(b), e);
 					}