deason 3 tahun lalu
induk
melakukan
7fbe3eb532

+ 209 - 227
examcloud-commons/src/main/java/cn/com/qmth/examcloud/commons/helpers/pipeline/SimpleNode.java

@@ -1,21 +1,19 @@
 package cn.com.qmth.examcloud.commons.helpers.pipeline;
 
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.ThreadContext;
-
-import com.google.common.collect.Lists;
-
 import cn.com.qmth.examcloud.commons.helpers.KeyValuePair;
 import cn.com.qmth.examcloud.commons.helpers.ObjectHolder;
 import cn.com.qmth.examcloud.commons.util.ThreadLocalUtil;
 import cn.com.qmth.examcloud.commons.util.Util;
+import com.google.common.collect.Lists;
+import org.apache.logging.log4j.ThreadContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 /**
  * 简单节点
  *
@@ -24,223 +22,207 @@ import org.slf4j.LoggerFactory;
  * @Copyright (c) 2018-2020 http://www.qmth.com.cn/ All Rights Reserved.
  */
 public class SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
-		implements
-			Node<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(SimpleNode.class);
-
-	private String nodeName;
-
-	private Storer<KEYIN, VALUEIN> storer = new Storer<KEYIN, VALUEIN>();
-
-	private Counter counter = new Counter();
-
-	private TaskContext context;
-
-	private NodeExecuter<KEYIN, VALUEIN, KEYOUT, VALUEOUT> executer;
-
-	private SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT> myself;
-
-	private Node<KEYOUT, VALUEOUT, ?, ?> lowerNode;
-
-	private Storer<KEYOUT, VALUEOUT> lowerStorer;
-
-	private boolean first;
-
-	private int sleep = 10;
-
-	private String traceId = ThreadLocalUtil.getTraceId();
-
-	/**
-	 * 构造函数
-	 *
-	 * @param nodeName
-	 * @param context
-	 */
-	public SimpleNode(String nodeName, NodeExecuter<KEYIN, VALUEIN, KEYOUT, VALUEOUT> executer,
-			TaskContext context) {
-		super();
-		this.nodeName = nodeName;
-		this.executer = executer;
-		this.context = context;
-		this.myself = this;
-	}
-
-	/**
-	 * 启动
-	 *
-	 * @author WANGWEI
-	 */
-	@Override
-	public void start() {
-		this.startNodeExecuter();
-
-		this.startLogThread();
-	}
-
-	/**
-	 * 创建节点执行器
-	 *
-	 * @author WANGWEI
-	 */
-	private void startNodeExecuter() {
-		new Thread(() -> {
-
-			while (true) {
-				traceId = ThreadLocalUtil.getTraceId();
-				// 设置log4j线程上下文
-				ThreadContext.put("TRACE_ID", traceId);
-
-				LOG.info(new StringBuilder("[" + nodeName + "]. ").append("start ... ...")
-						.toString());
-				counter = new Counter();
-				if (isFirst()) {
-					execute(null, null);
-				} else {
-					Set<Entry<KEYIN, VALUEIN>> entrySet = getStorer().getEntrySet();
-
-					for (Entry<KEYIN, VALUEIN> entry : entrySet) {
-						execute(entry.getKey(), entry.getValue());
-					}
-					LOG.info(new StringBuilder("[" + nodeName + "]. ").append("end .status: ")
-							.append(myself.getNodeStatusInfo()).toString());
-				}
-				try {
-					Util.sleep(getSleep());
-				} catch (Exception e) {
-					LOG.error("sleep Exception.", e);
-				}
-
-				// 清理log4j线程上下文
-				ThreadContext.clearAll();
-			}
-
-		}).start();
-	}
-
-	private void execute(KEYIN key, VALUEIN value) {
-		long s = System.currentTimeMillis();
-		try {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug(new StringBuilder("[" + nodeName + "]. ").append("handle entry. key=")
-						.append(null == key ? null : key.toString()).append("; value=")
-						.append(null == value ? null : value.toString()).toString());
-			}
-			counter.incrementTotal();
-			List<KeyValuePair<KEYOUT, VALUEOUT>> outList = Lists.newLinkedList();
-			ObjectHolder<Boolean> removable = new ObjectHolder<Boolean>(true);
-			executer.execute(key, value, outList, removable, context);
-
-			if (null != outList && null != getLowerStorer()) {
-				for (KeyValuePair<KEYOUT, VALUEOUT> pair : outList) {
-					getLowerStorer().putElement(pair.getKey(), pair.getValue());
-				}
-			}
-
-			if (null != removable.get() && removable.get()) {
-				if (null != key) {
-					getStorer().remove(key);
-				}
-			}
-
-			counter.incrementSuccessAmount();
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug(new StringBuilder("[" + nodeName + "]. ")
-						.append("handle entry successfully. key=")
-						.append(null == key ? null : key.toString()).append("; value=")
-						.append(null == value ? null : value.toString()).toString());
-			}
-		} catch (Exception e) {
-			counter.incrementFailureAmount();
-			if (LOG.isErrorEnabled()) {
-				LOG.error(new StringBuilder("[" + nodeName + "]. ")
-						.append("fail to handle entry. key=")
-						.append(null == key ? null : key.toString()).append("; value=")
-						.append(null == value ? null : value.toString()).toString(), e);
-			}
-		} finally {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug(new StringBuilder("[" + nodeName + "]. ").append("cost ")
-						.append(System.currentTimeMillis() - s).append("ms.").toString());
-			}
-		}
-	}
-
-	/**
-	 * 启动日志线程
-	 *
-	 * @author WANGWEI
-	 */
-	private void startLogThread() {
-		new Thread(() -> {
-			while (true) {
-				// 设置log4j线程上下文
-				ThreadContext.put("TRACE_ID", traceId);
-
-				Util.sleep(TimeUnit.SECONDS, 2);
-				if (LOG.isInfoEnabled()) {
-					LOG.info(new StringBuilder("[" + nodeName + "]. ").append("status: ")
-							.append(myself.getNodeStatusInfo()).toString());
-				}
-
-				// 清理log4j线程上下文
-				ThreadContext.clearAll();
-			}
-
-		}).start();
-	}
-
-	private String getNodeStatusInfo() {
-		long total = 0;
-		long successAmount = 0;
-		long failureAmount = 0;
-		Counter c = this.counter;
-		if (null != c) {
-			total = c.getTotal();
-			successAmount = c.getSuccessAmount();
-			failureAmount = c.getFailureAmount();
-		}
-		return new StringBuilder("Total=").append(total).append(",SuccessAmount=")
-				.append(successAmount).append(",FailureAmount=").append(failureAmount).toString();
-	}
-
-	@Override
-	public void setLowerNode(Node<KEYOUT, VALUEOUT, ?, ?> lowerNode) {
-		this.lowerNode = lowerNode;
-		lowerStorer = this.lowerNode.getStorer();
-	}
-
-	@Override
-	public Node<KEYOUT, VALUEOUT, ?, ?> getLowerNode() {
-		return this.lowerNode;
-	}
-
-	@Override
-	public Storer<KEYIN, VALUEIN> getStorer() {
-		return this.storer;
-	}
-
-	public Storer<KEYOUT, VALUEOUT> getLowerStorer() {
-		return lowerStorer;
-	}
-
-	public boolean isFirst() {
-		return first;
-	}
-
-	@Override
-	public void setFirst(boolean first) {
-		this.first = first;
-	}
-
-	public int getSleep() {
-		return sleep;
-	}
-
-	@Override
-	public void setSleep(int sleep) {
-		this.sleep = sleep;
-	}
+        implements Node<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleNode.class);
+
+    private String nodeName;
+
+    private Storer<KEYIN, VALUEIN> storer = new Storer<>();
+
+    private Counter counter = new Counter();
+
+    private TaskContext context;
+
+    private NodeExecuter<KEYIN, VALUEIN, KEYOUT, VALUEOUT> executer;
+
+    private SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT> myself;
+
+    private Node<KEYOUT, VALUEOUT, ?, ?> lowerNode;
+
+    private Storer<KEYOUT, VALUEOUT> lowerStorer;
+
+    private boolean first;
+
+    private int sleep = 10;
+
+    private String traceId = ThreadLocalUtil.getTraceId();
+
+    /**
+     * 构造函数
+     *
+     * @param nodeName
+     * @param context
+     */
+    public SimpleNode(String nodeName, NodeExecuter<KEYIN, VALUEIN, KEYOUT, VALUEOUT> executer,
+                      TaskContext context) {
+        super();
+        this.nodeName = nodeName;
+        this.executer = executer;
+        this.context = context;
+        this.myself = this;
+    }
+
+    /**
+     * 启动
+     *
+     * @author WANGWEI
+     */
+    @Override
+    public void start() {
+        this.startNodeExecuter();
+
+        this.startLogThread();
+    }
+
+    /**
+     * 创建节点执行器
+     *
+     * @author WANGWEI
+     */
+    private void startNodeExecuter() {
+        new Thread(() -> {
+
+            while (true) {
+                traceId = ThreadLocalUtil.getTraceId();
+                // 设置log4j线程上下文
+                ThreadContext.put("TRACE_ID", traceId);
+
+                LOG.debug("{} start...", nodeName);
+
+                counter = new Counter();
+                if (isFirst()) {
+                    execute(null, null);
+                } else {
+                    Set<Entry<KEYIN, VALUEIN>> entrySet = getStorer().getEntrySet();
+
+                    for (Entry<KEYIN, VALUEIN> entry : entrySet) {
+                        execute(entry.getKey(), entry.getValue());
+                    }
+
+                    LOG.info("{} executed... {}", nodeName, myself.getNodeStatusInfo());
+                }
+
+                try {
+                    Util.sleep(getSleep());
+                } catch (Exception e) {
+                    LOG.error(e.getMessage(), e);
+                }
+
+                // 清理log4j线程上下文
+                ThreadContext.clearAll();
+            }
+
+        }).start();
+    }
+
+    private void execute(KEYIN key, VALUEIN value) {
+        long s = System.currentTimeMillis();
+        try {
+            counter.incrementTotal();
+            List<KeyValuePair<KEYOUT, VALUEOUT>> outList = Lists.newLinkedList();
+            ObjectHolder<Boolean> removable = new ObjectHolder<>(true);
+            executer.execute(key, value, outList, removable, context);
+
+            if (null != outList && null != getLowerStorer()) {
+                for (KeyValuePair<KEYOUT, VALUEOUT> pair : outList) {
+                    getLowerStorer().putElement(pair.getKey(), pair.getValue());
+                }
+            }
+
+            if (null != removable.get() && removable.get()) {
+                if (null != key) {
+                    getStorer().remove(key);
+                }
+            }
+
+            counter.incrementSuccessAmount();
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("{} execute successful... key = {}, value = {}, cost {} ms", nodeName,
+                        key != null ? key.toString() : null, value != null ? value.toString() : null,
+                        System.currentTimeMillis() - s);
+            }
+        } catch (Exception e) {
+            counter.incrementFailureAmount();
+
+            LOG.error("{} execute fail... key = {}, value = {}", nodeName,
+                    key != null ? key.toString() : null, value != null ? value.toString() : null, e);
+        }
+    }
+
+    /**
+     * 启动日志线程
+     *
+     * @author WANGWEI
+     */
+    private void startLogThread() {
+        new Thread(() -> {
+            while (true) {
+                // 设置log4j线程上下文
+                ThreadContext.put("TRACE_ID", traceId);
+
+                Util.sleep(TimeUnit.SECONDS, 2);
+
+                // 清理log4j线程上下文
+                ThreadContext.clearAll();
+            }
+        }).start();
+    }
+
+    private String getNodeStatusInfo() {
+        long total = 0;
+        long successAmount = 0;
+        long failureAmount = 0;
+        Counter c = this.counter;
+        if (null != c) {
+            total = c.getTotal();
+            successAmount = c.getSuccessAmount();
+            failureAmount = c.getFailureAmount();
+        }
+
+        return new StringBuilder().append("Total = ").append(total)
+                .append(", SuccessAmount = ").append(successAmount)
+                .append(", FailureAmount = ").append(failureAmount)
+                .toString();
+    }
+
+    @Override
+    public void setLowerNode(Node<KEYOUT, VALUEOUT, ?, ?> lowerNode) {
+        this.lowerNode = lowerNode;
+        lowerStorer = this.lowerNode.getStorer();
+    }
+
+    @Override
+    public Node<KEYOUT, VALUEOUT, ?, ?> getLowerNode() {
+        return this.lowerNode;
+    }
+
+    @Override
+    public Storer<KEYIN, VALUEIN> getStorer() {
+        return this.storer;
+    }
+
+    public Storer<KEYOUT, VALUEOUT> getLowerStorer() {
+        return lowerStorer;
+    }
+
+    public boolean isFirst() {
+        return first;
+    }
+
+    @Override
+    public void setFirst(boolean first) {
+        this.first = first;
+    }
+
+    public int getSleep() {
+        return sleep;
+    }
+
+    @Override
+    public void setSleep(int sleep) {
+        this.sleep = sleep;
+    }
 
 }