WANG 6 年之前
父节点
当前提交
d395a9ed89

+ 177 - 0
src/main/java/cn/com/qmth/examcloud/commons/helpers/concurrency/simple/ConcurrentTask.java

@@ -0,0 +1,177 @@
+package cn.com.qmth.examcloud.commons.helpers.concurrency.simple;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLog;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLogFactory;
+import cn.com.qmth.examcloud.commons.util.Util;
+
+/**
+ * 并发任务
+ *
+ * @author WANGWEI
+ * @date 2019年6月19日
+ * @Copyright (c) 2018-2020 WANGWEI [QQ:522080330] All Rights Reserved.
+ */
+public class ConcurrentTask {
+
+	private static final ExamCloudLog LOG = ExamCloudLogFactory.getLog(ConcurrentTask.class);
+
+	private BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(10000);
+
+	private BlockingQueue<Integer> workerMessages = new LinkedBlockingQueue<Integer>(10000);
+
+	private final WorkerController workerController = new WorkerController();
+
+	/**
+	 * 最大线程数
+	 */
+	private int maxActiveThreadSize = 5;
+
+	/**
+	 * 不死线程数
+	 */
+	private int minThreadSize = 2;
+
+	private ThreadPoolExecutor threadPoolExecutor;
+
+	/**
+	 * 处理者
+	 */
+	private Worker worker;
+
+	/**
+	 * 巡检周期
+	 */
+	private int inspectionPeriod = 60;
+
+	/**
+	 * 添加处理元素
+	 *
+	 * @author WANGWEI
+	 * @param e
+	 * @return
+	 */
+	public boolean offerElement(Object e) {
+		return queue.offer(e);
+	}
+
+	/**
+	 * 启动任务
+	 *
+	 * @author WANGWEI
+	 */
+	public void start() {
+		threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxActiveThreadSize);
+
+		// 创建不死线程
+		for (int i = 0; i < minThreadSize; i++) {
+			addWorkerThread(true);
+		}
+
+		// 巡检线程
+		Thread inspectionThread = new Thread() {
+
+			@Override
+			public void run() {
+				Util.sleep(inspectionPeriod);
+				while (true) {
+
+					int size = queue.size();
+					int activeCount = threadPoolExecutor.getActiveCount();
+
+					int warnCount = workerController.getWarnCount();
+					// 巡检周期内(因并发超出限制导致的)警告数量未超过100时,增加一个worker
+					if (warnCount <= 100) {
+						if (100 < size && maxActiveThreadSize > activeCount) {
+							addWorkerThread(false);
+						}
+						Util.sleep(inspectionPeriod);
+					} else {
+						// 巡检周期内(因并发超出限制导致的)警告数量超过100时,减少一个worker,并重置警告数量
+						workerMessages.offer(warnCount);
+						workerController.resetWarnCount();
+						Util.sleep(inspectionPeriod * 10);
+					}
+
+				}
+			};
+
+		};
+
+		inspectionThread.setDaemon(true);
+		inspectionThread.start();
+	}
+
+	/**
+	 * 添加处理线程
+	 *
+	 * @author WANGWEI
+	 * @param immortal
+	 */
+	private void addWorkerThread(final boolean immortal) {
+		LOG.info("create a new worker. immortal=" + immortal);
+		Thread thread = new Thread() {
+
+			long nullTimes = 0;
+
+			@Override
+			public void run() {
+				while (true) {
+					Object el = queue.poll();
+					if (null == el) {
+						nullTimes++;
+						if (10 <= nullTimes) {
+							if (immortal) {
+								Util.sleep(10);
+								continue;
+							} else {
+								LOG.info("no element.worker exist.");
+								break;
+							}
+						} else {
+							Util.sleep(5);
+							continue;
+						}
+					} else {
+						nullTimes = 0;
+					}
+
+					try {
+						worker.process(workerController, el);
+					} catch (Exception e) {
+						LOG.error("unexpected exception", e);
+					}
+
+					// 非不死线程在抢到终止消息时,结束线程
+					if (!immortal) {
+						Integer warnCount = workerMessages.poll();
+						if (null != warnCount) {
+							LOG.info("worker exist. warnCount=" + warnCount);
+							break;
+						}
+					}
+
+				}
+			}
+		};
+
+		threadPoolExecutor.execute(thread);
+	}
+
+	public void setMaxActiveThreadSize(int maxActiveThreadSize) {
+		this.maxActiveThreadSize = maxActiveThreadSize;
+	}
+
+	public void setMinThreadSize(int minThreadSize) {
+		this.minThreadSize = minThreadSize;
+	}
+
+	public void setWorker(Worker worker) {
+		this.worker = worker;
+	}
+
+}

+ 7 - 0
src/main/java/cn/com/qmth/examcloud/commons/helpers/concurrency/simple/Worker.java

@@ -0,0 +1,7 @@
+package cn.com.qmth.examcloud.commons.helpers.concurrency.simple;
+
+public interface Worker {
+
+	void process(final WorkerController controller, Object element);
+
+}

+ 27 - 0
src/main/java/cn/com/qmth/examcloud/commons/helpers/concurrency/simple/WorkerController.java

@@ -0,0 +1,27 @@
+package cn.com.qmth.examcloud.commons.helpers.concurrency.simple;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLog;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLogFactory;
+
+public class WorkerController {
+
+	private static final ExamCloudLog LOG = ExamCloudLogFactory.getLog(WorkerController.class);
+
+	private AtomicInteger warnCount = new AtomicInteger(0);
+
+	public void addConcurrencyWarn() {
+		LOG.info("warn count ++");
+		this.warnCount.incrementAndGet();
+	}
+
+	public int getWarnCount() {
+		return this.warnCount.get();
+	}
+
+	public void resetWarnCount() {
+		warnCount.set(0);
+	}
+
+}