wangwei 5 vuotta sitten
vanhempi
commit
6afe12add6

+ 3 - 3
src/main/java/cn/com/qmth/examcloud/commons/helpers/pipeline/Node.java

@@ -49,11 +49,11 @@ public interface Node<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
 	void setFirst(boolean first);
 
 	/**
-	 * 设置循环周期
+	 * 设置循环间隔
 	 *
 	 * @author WANGWEI
-	 * @param cyclePeriod
+	 * @param sleep
 	 */
-	void setCyclePeriod(int cyclePeriod);
+	void setSleep(int sleep);
 
 }

+ 31 - 14
src/main/java/cn/com/qmth/examcloud/commons/helpers/pipeline/SimpleNode.java

@@ -4,8 +4,11 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.logging.log4j.ThreadContext;
+
 import cn.com.qmth.examcloud.commons.logging.ExamCloudLog;
 import cn.com.qmth.examcloud.commons.logging.ExamCloudLogFactory;
+import cn.com.qmth.examcloud.commons.util.ThreadLocalUtil;
 import cn.com.qmth.examcloud.commons.util.Util;
 
 /**
@@ -23,8 +26,6 @@ public class SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 
 	private String nodeName;
 
-	private int cyclePeriod = 10;
-
 	private Storer<KEYIN, VALUEIN> storer = new Storer<KEYIN, VALUEIN>();
 
 	private Counter counter = new Counter();
@@ -41,6 +42,10 @@ public class SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 
 	private boolean first;
 
+	private int sleep = 10;
+
+	private String traceId = ThreadLocalUtil.getTraceId();
+
 	/**
 	 * 构造函数
 	 *
@@ -78,6 +83,10 @@ public class SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 		new Thread(() -> {
 
 			while (true) {
+				traceId = ThreadLocalUtil.getTraceId();
+				// 设置log4j线程上下文
+				ThreadContext.put("TRACE_ID", traceId);
+
 				LOG.info(new StringBuilder("[" + nodeName + "]. ").append("start ... ...")
 						.toString());
 				counter = new Counter();
@@ -88,18 +97,21 @@ public class SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 
 					for (Entry<KEYIN, VALUEIN> entry : entrySet) {
 						execute(entry.getKey(), entry.getValue());
+						getStorer().remove(entry.getKey());
 					}
 				}
 
 				try {
-					Util.sleep(getCyclePeriod());
+					Util.sleep(getSleep());
 				} catch (Exception e) {
 					LOG.error("sleep Exception.", e);
 				}
 
+				// 清理log4j线程上下文
+				ThreadContext.clearAll();
 			}
 
-		}, "NODE-" + nodeName).start();
+		}).start();
 	}
 
 	private void execute(KEYIN key, VALUEIN value) {
@@ -138,15 +150,20 @@ public class SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 	private void startLogThread() {
 		new Thread(() -> {
 			while (true) {
+				// 设置log4j线程上下文
+				ThreadContext.put("TRACE_ID", traceId);
 
 				Util.sleep(TimeUnit.SECONDS, 2);
 				if (LOG.isInfoEnabled()) {
 					LOG.debug(new StringBuilder("[" + nodeName + "]. ").append("status: ")
 							.append(myself.getNodeStatusInfo()).toString());
 				}
+
+				// 清理log4j线程上下文
+				ThreadContext.clearAll();
 			}
 
-		}, "NODE-" + nodeName).start();
+		}).start();
 	}
 
 	private String getNodeStatusInfo() {
@@ -179,15 +196,6 @@ public class SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 		return this.storer;
 	}
 
-	public int getCyclePeriod() {
-		return cyclePeriod;
-	}
-
-	@Override
-	public void setCyclePeriod(int cyclePeriod) {
-		this.cyclePeriod = cyclePeriod;
-	}
-
 	public Storer<KEYOUT, VALUEOUT> getLowerStorer() {
 		return lowerStorer;
 	}
@@ -201,4 +209,13 @@ public class SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 		this.first = first;
 	}
 
+	public int getSleep() {
+		return sleep;
+	}
+
+	@Override
+	public void setSleep(int sleep) {
+		this.sleep = sleep;
+	}
+
 }

+ 4 - 0
src/main/java/cn/com/qmth/examcloud/commons/helpers/pipeline/Storer.java

@@ -33,4 +33,8 @@ public class Storer<KEY, VALUE> {
 		return MAP.isEmpty();
 	}
 
+	public synchronized void remove(KEY key) {
+		MAP.remove(key);
+	}
+
 }