|
@@ -1,21 +1,19 @@
|
|
|
package cn.com.qmth.examcloud.commons.helpers.pipeline;
|
|
|
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map.Entry;
|
|
|
-import java.util.Set;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-
|
|
|
-import org.apache.logging.log4j.ThreadContext;
|
|
|
-
|
|
|
-import com.google.common.collect.Lists;
|
|
|
-
|
|
|
import cn.com.qmth.examcloud.commons.helpers.KeyValuePair;
|
|
|
import cn.com.qmth.examcloud.commons.helpers.ObjectHolder;
|
|
|
import cn.com.qmth.examcloud.commons.util.ThreadLocalUtil;
|
|
|
import cn.com.qmth.examcloud.commons.util.Util;
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+import org.apache.logging.log4j.ThreadContext;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map.Entry;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
/**
|
|
|
* 简单节点
|
|
|
*
|
|
@@ -24,223 +22,214 @@ import org.slf4j.LoggerFactory;
|
|
|
* @Copyright (c) 2018-2020 http://www.qmth.com.cn/ All Rights Reserved.
|
|
|
*/
|
|
|
public class SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
|
|
|
- implements
|
|
|
- Node<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
|
|
|
-
|
|
|
- private static final Logger LOG = LoggerFactory.getLogger(SimpleNode.class);
|
|
|
-
|
|
|
- private String nodeName;
|
|
|
-
|
|
|
- private Storer<KEYIN, VALUEIN> storer = new Storer<KEYIN, VALUEIN>();
|
|
|
-
|
|
|
- private Counter counter = new Counter();
|
|
|
-
|
|
|
- private TaskContext context;
|
|
|
-
|
|
|
- private NodeExecuter<KEYIN, VALUEIN, KEYOUT, VALUEOUT> executer;
|
|
|
-
|
|
|
- private SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT> myself;
|
|
|
-
|
|
|
- private Node<KEYOUT, VALUEOUT, ?, ?> lowerNode;
|
|
|
-
|
|
|
- private Storer<KEYOUT, VALUEOUT> lowerStorer;
|
|
|
-
|
|
|
- private boolean first;
|
|
|
-
|
|
|
- private int sleep = 10;
|
|
|
-
|
|
|
- private String traceId = ThreadLocalUtil.getTraceId();
|
|
|
-
|
|
|
- /**
|
|
|
- * 构造函数
|
|
|
- *
|
|
|
- * @param nodeName
|
|
|
- * @param context
|
|
|
- */
|
|
|
- public SimpleNode(String nodeName, NodeExecuter<KEYIN, VALUEIN, KEYOUT, VALUEOUT> executer,
|
|
|
- TaskContext context) {
|
|
|
- super();
|
|
|
- this.nodeName = nodeName;
|
|
|
- this.executer = executer;
|
|
|
- this.context = context;
|
|
|
- this.myself = this;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 启动
|
|
|
- *
|
|
|
- * @author WANGWEI
|
|
|
- */
|
|
|
- @Override
|
|
|
- public void start() {
|
|
|
- this.startNodeExecuter();
|
|
|
-
|
|
|
- this.startLogThread();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 创建节点执行器
|
|
|
- *
|
|
|
- * @author WANGWEI
|
|
|
- */
|
|
|
- private void startNodeExecuter() {
|
|
|
- new Thread(() -> {
|
|
|
-
|
|
|
- while (true) {
|
|
|
- traceId = ThreadLocalUtil.getTraceId();
|
|
|
- // 设置log4j线程上下文
|
|
|
- ThreadContext.put("TRACE_ID", traceId);
|
|
|
-
|
|
|
- LOG.info(new StringBuilder("[" + nodeName + "]. ").append("start ... ...")
|
|
|
- .toString());
|
|
|
- counter = new Counter();
|
|
|
- if (isFirst()) {
|
|
|
- execute(null, null);
|
|
|
- } else {
|
|
|
- Set<Entry<KEYIN, VALUEIN>> entrySet = getStorer().getEntrySet();
|
|
|
-
|
|
|
- for (Entry<KEYIN, VALUEIN> entry : entrySet) {
|
|
|
- execute(entry.getKey(), entry.getValue());
|
|
|
- }
|
|
|
- LOG.info(new StringBuilder("[" + nodeName + "]. ").append("end .status: ")
|
|
|
- .append(myself.getNodeStatusInfo()).toString());
|
|
|
- }
|
|
|
- try {
|
|
|
- Util.sleep(getSleep());
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("sleep Exception.", e);
|
|
|
- }
|
|
|
-
|
|
|
- // 清理log4j线程上下文
|
|
|
- ThreadContext.clearAll();
|
|
|
- }
|
|
|
-
|
|
|
- }).start();
|
|
|
- }
|
|
|
-
|
|
|
- private void execute(KEYIN key, VALUEIN value) {
|
|
|
- long s = System.currentTimeMillis();
|
|
|
- try {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(new StringBuilder("[" + nodeName + "]. ").append("handle entry. key=")
|
|
|
- .append(null == key ? null : key.toString()).append("; value=")
|
|
|
- .append(null == value ? null : value.toString()).toString());
|
|
|
- }
|
|
|
- counter.incrementTotal();
|
|
|
- List<KeyValuePair<KEYOUT, VALUEOUT>> outList = Lists.newLinkedList();
|
|
|
- ObjectHolder<Boolean> removable = new ObjectHolder<Boolean>(true);
|
|
|
- executer.execute(key, value, outList, removable, context);
|
|
|
-
|
|
|
- if (null != outList && null != getLowerStorer()) {
|
|
|
- for (KeyValuePair<KEYOUT, VALUEOUT> pair : outList) {
|
|
|
- getLowerStorer().putElement(pair.getKey(), pair.getValue());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (null != removable.get() && removable.get()) {
|
|
|
- if (null != key) {
|
|
|
- getStorer().remove(key);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- counter.incrementSuccessAmount();
|
|
|
-
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(new StringBuilder("[" + nodeName + "]. ")
|
|
|
- .append("handle entry successfully. key=")
|
|
|
- .append(null == key ? null : key.toString()).append("; value=")
|
|
|
- .append(null == value ? null : value.toString()).toString());
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- counter.incrementFailureAmount();
|
|
|
- if (LOG.isErrorEnabled()) {
|
|
|
- LOG.error(new StringBuilder("[" + nodeName + "]. ")
|
|
|
- .append("fail to handle entry. key=")
|
|
|
- .append(null == key ? null : key.toString()).append("; value=")
|
|
|
- .append(null == value ? null : value.toString()).toString(), e);
|
|
|
- }
|
|
|
- } finally {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(new StringBuilder("[" + nodeName + "]. ").append("cost ")
|
|
|
- .append(System.currentTimeMillis() - s).append("ms.").toString());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 启动日志线程
|
|
|
- *
|
|
|
- * @author WANGWEI
|
|
|
- */
|
|
|
- private void startLogThread() {
|
|
|
- new Thread(() -> {
|
|
|
- while (true) {
|
|
|
- // 设置log4j线程上下文
|
|
|
- ThreadContext.put("TRACE_ID", traceId);
|
|
|
-
|
|
|
- Util.sleep(TimeUnit.SECONDS, 2);
|
|
|
- if (LOG.isInfoEnabled()) {
|
|
|
- LOG.info(new StringBuilder("[" + nodeName + "]. ").append("status: ")
|
|
|
- .append(myself.getNodeStatusInfo()).toString());
|
|
|
- }
|
|
|
-
|
|
|
- // 清理log4j线程上下文
|
|
|
- ThreadContext.clearAll();
|
|
|
- }
|
|
|
-
|
|
|
- }).start();
|
|
|
- }
|
|
|
-
|
|
|
- private String getNodeStatusInfo() {
|
|
|
- long total = 0;
|
|
|
- long successAmount = 0;
|
|
|
- long failureAmount = 0;
|
|
|
- Counter c = this.counter;
|
|
|
- if (null != c) {
|
|
|
- total = c.getTotal();
|
|
|
- successAmount = c.getSuccessAmount();
|
|
|
- failureAmount = c.getFailureAmount();
|
|
|
- }
|
|
|
- return new StringBuilder("Total=").append(total).append(",SuccessAmount=")
|
|
|
- .append(successAmount).append(",FailureAmount=").append(failureAmount).toString();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void setLowerNode(Node<KEYOUT, VALUEOUT, ?, ?> lowerNode) {
|
|
|
- this.lowerNode = lowerNode;
|
|
|
- lowerStorer = this.lowerNode.getStorer();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public Node<KEYOUT, VALUEOUT, ?, ?> getLowerNode() {
|
|
|
- return this.lowerNode;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public Storer<KEYIN, VALUEIN> getStorer() {
|
|
|
- return this.storer;
|
|
|
- }
|
|
|
-
|
|
|
- public Storer<KEYOUT, VALUEOUT> getLowerStorer() {
|
|
|
- return lowerStorer;
|
|
|
- }
|
|
|
-
|
|
|
- public boolean isFirst() {
|
|
|
- return first;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void setFirst(boolean first) {
|
|
|
- this.first = first;
|
|
|
- }
|
|
|
-
|
|
|
- public int getSleep() {
|
|
|
- return sleep;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void setSleep(int sleep) {
|
|
|
- this.sleep = sleep;
|
|
|
- }
|
|
|
+ implements Node<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
|
|
|
+
|
|
|
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleNode.class);
|
|
|
+
|
|
|
+ private String nodeName;
|
|
|
+
|
|
|
+ private Storer<KEYIN, VALUEIN> storer = new Storer<>();
|
|
|
+
|
|
|
+ private Counter counter = new Counter();
|
|
|
+
|
|
|
+ private TaskContext context;
|
|
|
+
|
|
|
+ private NodeExecuter<KEYIN, VALUEIN, KEYOUT, VALUEOUT> executer;
|
|
|
+
|
|
|
+ private SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT> myself;
|
|
|
+
|
|
|
+ private Node<KEYOUT, VALUEOUT, ?, ?> lowerNode;
|
|
|
+
|
|
|
+ private Storer<KEYOUT, VALUEOUT> lowerStorer;
|
|
|
+
|
|
|
+ private boolean first;
|
|
|
+
|
|
|
+ private int sleep = 10;
|
|
|
+
|
|
|
+ private String traceId = ThreadLocalUtil.getTraceId();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 构造函数
|
|
|
+ *
|
|
|
+ * @param nodeName
|
|
|
+ * @param context
|
|
|
+ */
|
|
|
+ public SimpleNode(String nodeName, NodeExecuter<KEYIN, VALUEIN, KEYOUT, VALUEOUT> executer,
|
|
|
+ TaskContext context) {
|
|
|
+ super();
|
|
|
+ this.nodeName = nodeName;
|
|
|
+ this.executer = executer;
|
|
|
+ this.context = context;
|
|
|
+ this.myself = this;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 启动
|
|
|
+ *
|
|
|
+ * @author WANGWEI
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void start() {
|
|
|
+ this.startNodeExecuter();
|
|
|
+
|
|
|
+ this.startLogThread();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 创建节点执行器
|
|
|
+ *
|
|
|
+ * @author WANGWEI
|
|
|
+ */
|
|
|
+ private void startNodeExecuter() {
|
|
|
+ new Thread(() -> {
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ traceId = ThreadLocalUtil.getTraceId();
|
|
|
+ // 设置log4j线程上下文
|
|
|
+ ThreadContext.put("TRACE_ID", traceId);
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("{}, {} start", myself.getNodeStatusInfo(), nodeName);
|
|
|
+ }
|
|
|
+
|
|
|
+ counter = new Counter();
|
|
|
+ if (isFirst()) {
|
|
|
+ execute(null, null);
|
|
|
+ } else {
|
|
|
+ Set<Entry<KEYIN, VALUEIN>> entrySet = getStorer().getEntrySet();
|
|
|
+
|
|
|
+ for (Entry<KEYIN, VALUEIN> entry : entrySet) {
|
|
|
+ execute(entry.getKey(), entry.getValue());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("{}, {} end", myself.getNodeStatusInfo(), nodeName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ Util.sleep(getSleep());
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error(e.getMessage(), e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 清理log4j线程上下文
|
|
|
+ ThreadContext.clearAll();
|
|
|
+ }
|
|
|
+
|
|
|
+ }).start();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void execute(KEYIN key, VALUEIN value) {
|
|
|
+ long s = System.currentTimeMillis();
|
|
|
+ try {
|
|
|
+ counter.incrementTotal();
|
|
|
+ List<KeyValuePair<KEYOUT, VALUEOUT>> outList = Lists.newLinkedList();
|
|
|
+ ObjectHolder<Boolean> removable = new ObjectHolder<>(true);
|
|
|
+ executer.execute(key, value, outList, removable, context);
|
|
|
+
|
|
|
+ if (null != outList && null != getLowerStorer()) {
|
|
|
+ for (KeyValuePair<KEYOUT, VALUEOUT> pair : outList) {
|
|
|
+ getLowerStorer().putElement(pair.getKey(), pair.getValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (null != removable.get() && removable.get()) {
|
|
|
+ if (null != key) {
|
|
|
+ getStorer().remove(key);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ counter.incrementSuccessAmount();
|
|
|
+
|
|
|
+ if (LOG.isInfoEnabled()) {
|
|
|
+ LOG.info("{}, {} execute successful... key = {}, cost {} ms",
|
|
|
+ myself.getNodeStatusInfo(), nodeName,
|
|
|
+ key != null ? key.toString() : null,
|
|
|
+ System.currentTimeMillis() - s);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ counter.incrementFailureAmount();
|
|
|
+
|
|
|
+ LOG.error("{}, {} execute fail... key = {}, value = {}",
|
|
|
+ myself.getNodeStatusInfo(), nodeName,
|
|
|
+ key != null ? key.toString() : null,
|
|
|
+ value != null ? value.toString() : null, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 启动日志线程
|
|
|
+ *
|
|
|
+ * @author WANGWEI
|
|
|
+ */
|
|
|
+ private void startLogThread() {
|
|
|
+ new Thread(() -> {
|
|
|
+ while (true) {
|
|
|
+ // 设置log4j线程上下文
|
|
|
+ ThreadContext.put("TRACE_ID", traceId);
|
|
|
+
|
|
|
+ Util.sleep(TimeUnit.SECONDS, 2);
|
|
|
+
|
|
|
+ // 清理log4j线程上下文
|
|
|
+ ThreadContext.clearAll();
|
|
|
+ }
|
|
|
+ }).start();
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getNodeStatusInfo() {
|
|
|
+ long total = 0;
|
|
|
+ long successAmount = 0;
|
|
|
+ long failureAmount = 0;
|
|
|
+ Counter c = this.counter;
|
|
|
+ if (null != c) {
|
|
|
+ total = c.getTotal();
|
|
|
+ successAmount = c.getSuccessAmount();
|
|
|
+ failureAmount = c.getFailureAmount();
|
|
|
+ }
|
|
|
+
|
|
|
+ return new StringBuilder().append("Total = ").append(total)
|
|
|
+ .append(", Success = ").append(successAmount)
|
|
|
+ .append(", Fail = ").append(failureAmount)
|
|
|
+ .toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setLowerNode(Node<KEYOUT, VALUEOUT, ?, ?> lowerNode) {
|
|
|
+ this.lowerNode = lowerNode;
|
|
|
+ lowerStorer = this.lowerNode.getStorer();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Node<KEYOUT, VALUEOUT, ?, ?> getLowerNode() {
|
|
|
+ return this.lowerNode;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Storer<KEYIN, VALUEIN> getStorer() {
|
|
|
+ return this.storer;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Storer<KEYOUT, VALUEOUT> getLowerStorer() {
|
|
|
+ return lowerStorer;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean isFirst() {
|
|
|
+ return first;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setFirst(boolean first) {
|
|
|
+ this.first = first;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int getSleep() {
|
|
|
+ return sleep;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setSleep(int sleep) {
|
|
|
+ this.sleep = sleep;
|
|
|
+ }
|
|
|
|
|
|
}
|