wangwei 5 yıl önce
ebeveyn
işleme
fbc8114fa4

+ 43 - 0
src/main/java/cn/com/qmth/examcloud/commons/helpers/pipeline/Counter.java

@@ -0,0 +1,43 @@
+package cn.com.qmth.examcloud.commons.helpers.pipeline;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * 输出计数器
+ *
+ * @author WANGWEI
+ * @date 2019年12月12日
+ * @Copyright (c) 2018-2020 WANGWEI [QQ:522080330] All Rights Reserved.
+ */
+public class Counter {
+
+	private AtomicLong total = new AtomicLong(0);
+
+	private AtomicLong successAmount = new AtomicLong(0);
+
+	private AtomicLong failureAmount = new AtomicLong(0);
+
+	public long getTotal() {
+		return total.get();
+	}
+
+	public long getSuccessAmount() {
+		return successAmount.get();
+	}
+
+	public long getFailureAmount() {
+		return failureAmount.get();
+	}
+
+	public long incrementTotal() {
+		return this.total.incrementAndGet();
+	}
+
+	public long incrementSuccessAmount() {
+		return this.successAmount.incrementAndGet();
+	}
+
+	public long incrementFailureAmount() {
+		return this.failureAmount.incrementAndGet();
+	}
+}

+ 59 - 0
src/main/java/cn/com/qmth/examcloud/commons/helpers/pipeline/Node.java

@@ -0,0 +1,59 @@
+package cn.com.qmth.examcloud.commons.helpers.pipeline;
+
+/**
+ * 节点
+ *
+ * @author WANGWEI
+ * @date 2019年12月12日
+ * @Copyright (c) 2018-2020 WANGWEI [QQ:522080330] All Rights Reserved.
+ */
+public interface Node<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+	/**
+	 * 启动
+	 *
+	 * @author WANGWEI
+	 */
+	void start();
+
+	/**
+	 * 设置下级节点
+	 *
+	 * @author WANGWEI
+	 * @param subNode
+	 */
+	void setLowerNode(Node<KEYOUT, VALUEOUT, ?, ?> lowerNode);
+
+	/**
+	 * 获取下级节点
+	 *
+	 * @author WANGWEI
+	 * @return
+	 */
+	Node<KEYOUT, VALUEOUT, ?, ?> getLowerNode();
+
+	/**
+	 * 获取存储器
+	 *
+	 * @author WANGWEI
+	 * @return
+	 */
+	Storer<KEYIN, VALUEIN> getStorer();
+
+	/**
+	 * 设置是否是首节点
+	 *
+	 * @author WANGWEI
+	 * @param first
+	 */
+	void setFirst(boolean first);
+
+	/**
+	 * 设置循环周期
+	 *
+	 * @author WANGWEI
+	 * @param cyclePeriod
+	 */
+	void setCyclePeriod(int cyclePeriod);
+
+}

+ 19 - 0
src/main/java/cn/com/qmth/examcloud/commons/helpers/pipeline/NodeExecuter.java

@@ -0,0 +1,19 @@
+package cn.com.qmth.examcloud.commons.helpers.pipeline;
+
+/**
+ * 节点执行器
+ *
+ * @author WANGWEI
+ * @date 2019年12月12日
+ * @Copyright (c) 2018-2020 WANGWEI [QQ:522080330] All Rights Reserved.
+ * @param <KEYIN>
+ * @param <VALUEIN>
+ * @param <KEYOUT>
+ * @param <VALUEOUT>
+ */
+public interface NodeExecuter<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+	void execute(KEYIN key, VALUEIN value, Storer<KEYOUT, VALUEOUT> storer, TaskContext context)
+			throws Exception;
+
+}

+ 204 - 0
src/main/java/cn/com/qmth/examcloud/commons/helpers/pipeline/SimpleNode.java

@@ -0,0 +1,204 @@
+package cn.com.qmth.examcloud.commons.helpers.pipeline;
+
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLog;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLogFactory;
+import cn.com.qmth.examcloud.commons.util.Util;
+
+/**
+ * 简单节点
+ *
+ * @author WANGWEI
+ * @date 2019年12月12日
+ * @Copyright (c) 2018-2020 WANGWEI [QQ:522080330] All Rights Reserved.
+ */
+public class SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+		implements
+			Node<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+	private static final ExamCloudLog LOG = ExamCloudLogFactory.getLog(SimpleNode.class);
+
+	private String nodeName;
+
+	private int cyclePeriod = 10;
+
+	private Storer<KEYIN, VALUEIN> storer = new Storer<KEYIN, VALUEIN>();
+
+	private Counter counter = new Counter();
+
+	private TaskContext context;
+
+	private NodeExecuter<KEYIN, VALUEIN, KEYOUT, VALUEOUT> executer;
+
+	private SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT> myself;
+
+	private Node<KEYOUT, VALUEOUT, ?, ?> lowerNode;
+
+	private Storer<KEYOUT, VALUEOUT> lowerStorer;
+
+	private boolean first;
+
+	/**
+	 * 构造函数
+	 *
+	 * @param nodeName
+	 * @param nodeExecuter
+	 * @param context
+	 */
+	public SimpleNode(String nodeName, NodeExecuter<KEYIN, VALUEIN, KEYOUT, VALUEOUT> executer,
+			TaskContext context) {
+		super();
+		this.nodeName = nodeName;
+		this.executer = executer;
+		this.context = context;
+		this.myself = this;
+	}
+
+	/**
+	 * 启动
+	 *
+	 * @author WANGWEI
+	 */
+	@Override
+	public void start() {
+		this.startNodeExecuters();
+
+		this.startLogThread();
+	}
+
+	/**
+	 * 创建节点执行器
+	 *
+	 * @author WANGWEI
+	 */
+	private void startNodeExecuters() {
+		new Thread(() -> {
+
+			while (true) {
+				LOG.info(new StringBuilder("[" + nodeName + "]. ").append("start ... ...")
+						.toString());
+				counter = new Counter();
+				if (isFirst()) {
+					execute(null, null);
+				} else {
+					Set<Entry<KEYIN, VALUEIN>> entrySet = getStorer().getEntrySet();
+
+					for (Entry<KEYIN, VALUEIN> entry : entrySet) {
+						execute(entry.getKey(), entry.getValue());
+					}
+				}
+
+				try {
+					Util.sleep(getCyclePeriod());
+				} catch (Exception e) {
+					LOG.error("sleep Exception.", e);
+				}
+
+			}
+
+		}, "NODE-" + nodeName).start();
+	}
+
+	private void execute(KEYIN key, VALUEIN value) {
+		try {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug(new StringBuilder("[" + nodeName + "]. ").append("handle entry. key=")
+						.append(null == key ? null : key.toString()).append("; value=")
+						.append(null == value ? null : value.toString()).toString());
+			}
+			counter.incrementTotal();
+			executer.execute(key, value, getLowerStorer(), context);
+			counter.incrementSuccessAmount();
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug(new StringBuilder("[" + nodeName + "]. ")
+						.append("handle entry successfully. key=")
+						.append(null == key ? null : key.toString()).append("; value=")
+						.append(null == value ? null : value.toString()).toString());
+			}
+		} catch (Exception e) {
+			counter.incrementFailureAmount();
+			if (LOG.isErrorEnabled()) {
+				LOG.error(new StringBuilder("[" + nodeName + "]. ")
+						.append("fail to handle entry. key=")
+						.append(null == key ? null : key.toString()).append("; value=")
+						.append(null == value ? null : value.toString()).toString(), e);
+			}
+		}
+	}
+
+	/**
+	 * 启动日志线程
+	 *
+	 * @author WANGWEI
+	 */
+	private void startLogThread() {
+		new Thread(() -> {
+			while (true) {
+
+				Util.sleep(TimeUnit.SECONDS, 2);
+				if (LOG.isInfoEnabled()) {
+					LOG.debug(new StringBuilder("[" + nodeName + "]. ").append("status: ")
+							.append(myself.getNodeStatusInfo()).toString());
+				}
+			}
+
+		}, "NODE-" + nodeName).start();
+	}
+
+	private String getNodeStatusInfo() {
+		long total = 0;
+		long successAmount = 0;
+		long failureAmount = 0;
+		Counter c = this.counter;
+		if (null != c) {
+			total = c.getTotal();
+			successAmount = c.getSuccessAmount();
+			failureAmount = c.getFailureAmount();
+		}
+		return new StringBuilder("Total=").append(total).append(",SuccessAmount=")
+				.append(successAmount).append(",FailureAmount=").append(failureAmount).toString();
+	}
+
+	@Override
+	public void setLowerNode(Node<KEYOUT, VALUEOUT, ?, ?> lowerNode) {
+		this.lowerNode = lowerNode;
+		lowerStorer = this.lowerNode.getStorer();
+	}
+
+	@Override
+	public Node<KEYOUT, VALUEOUT, ?, ?> getLowerNode() {
+		return this.lowerNode;
+	}
+
+	@Override
+	public Storer<KEYIN, VALUEIN> getStorer() {
+		return this.storer;
+	}
+
+	public int getCyclePeriod() {
+		return cyclePeriod;
+	}
+
+	@Override
+	public void setCyclePeriod(int cyclePeriod) {
+		this.cyclePeriod = cyclePeriod;
+	}
+
+	public Storer<KEYOUT, VALUEOUT> getLowerStorer() {
+		return lowerStorer;
+	}
+
+	public boolean isFirst() {
+		return first;
+	}
+
+	@Override
+	public void setFirst(boolean first) {
+		this.first = first;
+	}
+
+}

+ 36 - 0
src/main/java/cn/com/qmth/examcloud/commons/helpers/pipeline/Storer.java

@@ -0,0 +1,36 @@
+package cn.com.qmth.examcloud.commons.helpers.pipeline;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.google.common.collect.Maps;
+
+/**
+ * 节点存储器
+ *
+ * @author WANGWEI
+ * @date 2019年12月12日
+ * @Copyright (c) 2018-2020 WANGWEI [QQ:522080330] All Rights Reserved.
+ */
+public class Storer<KEY, VALUE> {
+
+	private Map<KEY, VALUE> MAP = Maps.newConcurrentMap();
+
+	private Map<KEY, VALUE> BUFFER_MAP = Maps.newConcurrentMap();
+
+	public synchronized Set<Entry<KEY, VALUE>> getEntrySet() {
+		MAP.putAll(BUFFER_MAP);
+		BUFFER_MAP.clear();
+		return MAP.entrySet();
+	}
+
+	public synchronized void putElement(KEY key, VALUE value) {
+		BUFFER_MAP.put(key, value);
+	}
+
+	public synchronized boolean isEmpty() {
+		return MAP.isEmpty();
+	}
+
+}

+ 34 - 0
src/main/java/cn/com/qmth/examcloud/commons/helpers/pipeline/TaskContext.java

@@ -0,0 +1,34 @@
+package cn.com.qmth.examcloud.commons.helpers.pipeline;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 任务上下文(任务参数)
+ *
+ * @author WANGWEI
+ * @date 2019年12月12日
+ * @Copyright (c) 2018-2020 WANGWEI [QQ:522080330] All Rights Reserved.
+ */
+public class TaskContext implements Serializable {
+	private static final long serialVersionUID = 4979254175922604943L;
+
+	private final Map<String, Object> props = new ConcurrentHashMap<String, Object>();
+
+	public String get(String name) {
+		return get(name, String.class);
+	}
+
+	public <T> T get(String name, Class<T> t) {
+		Object value = props.get(name);
+		@SuppressWarnings("unchecked")
+		T ret = (T) value;
+		return ret;
+	}
+
+	public void put(String name, Object bean) {
+		props.put(name, bean);
+	}
+
+}