wangwei 5 жил өмнө
parent
commit
78b2f4797a

+ 3 - 1
src/main/java/cn/com/qmth/examcloud/commons/helpers/pipeline/NodeExecuter.java

@@ -1,5 +1,7 @@
 package cn.com.qmth.examcloud.commons.helpers.pipeline;
 
+import cn.com.qmth.examcloud.commons.helpers.KeyValuePair;
+
 /**
  * 节点执行器
  *
@@ -13,7 +15,7 @@ package cn.com.qmth.examcloud.commons.helpers.pipeline;
  */
 public interface NodeExecuter<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
 
-	void execute(KEYIN key, VALUEIN value, Storer<KEYOUT, VALUEOUT> storer, TaskContext context)
+	KeyValuePair<KEYOUT, VALUEOUT> execute(KEYIN key, VALUEIN value, TaskContext context)
 			throws Exception;
 
 }

+ 9 - 2
src/main/java/cn/com/qmth/examcloud/commons/helpers/pipeline/SimpleNode.java

@@ -6,6 +6,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.logging.log4j.ThreadContext;
 
+import cn.com.qmth.examcloud.commons.helpers.KeyValuePair;
 import cn.com.qmth.examcloud.commons.logging.ExamCloudLog;
 import cn.com.qmth.examcloud.commons.logging.ExamCloudLogFactory;
 import cn.com.qmth.examcloud.commons.util.ThreadLocalUtil;
@@ -97,7 +98,6 @@ public class SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 
 					for (Entry<KEYIN, VALUEIN> entry : entrySet) {
 						execute(entry.getKey(), entry.getValue());
-						getStorer().remove(entry.getKey());
 					}
 				}
 
@@ -122,7 +122,14 @@ public class SimpleNode<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 						.append(null == value ? null : value.toString()).toString());
 			}
 			counter.incrementTotal();
-			executer.execute(key, value, getLowerStorer(), context);
+			KeyValuePair<KEYOUT, VALUEOUT> out = executer.execute(key, value, context);
+
+			if (null != out && null != getLowerStorer()) {
+				getLowerStorer().putElement(out.getKey(), out.getValue());
+			}
+
+			getStorer().remove(key);
+
 			counter.incrementSuccessAmount();
 
 			if (LOG.isDebugEnabled()) {