|
@@ -0,0 +1,140 @@
|
|
|
+package cn.com.qmth.scancloud.tools.multithread;
|
|
|
+
|
|
|
+import java.lang.reflect.ParameterizedType;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import cn.com.qmth.scancloud.tools.utils.SpringContextHolder;
|
|
|
+import cn.com.qmth.scancloud.tools.utils.StatusException;
|
|
|
+
|
|
|
+public abstract class Producer<T, C extends Consumer<T>> {
|
|
|
+ private static final Logger LOG = LoggerFactory.getLogger(Producer.class);
|
|
|
+ private Basket basket;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 消费线程class
|
|
|
+ */
|
|
|
+ private List<Consumer<T>> consumers;
|
|
|
+
|
|
|
+ public void startDispose(int consumerCount){
|
|
|
+ startDispose(consumerCount, null,0);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理开始方法
|
|
|
+ */
|
|
|
+ public void startDispose(int consumerCount, Map<String, Object> param) {
|
|
|
+ startDispose(consumerCount, param,0);
|
|
|
+ }
|
|
|
+ public void startDispose(int consumerCount,int total) {
|
|
|
+ startDispose(consumerCount, null,total);
|
|
|
+ }
|
|
|
+ public void startDispose(int consumerCount, Map<String, Object> param,int total) {
|
|
|
+ // 启动消费者
|
|
|
+ startConsumer(consumerCount,total);
|
|
|
+ // 开始处理
|
|
|
+ dispose(param);
|
|
|
+ }
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private void startConsumer(int consumerCount,int total) {
|
|
|
+ if (consumerCount <= 0) {
|
|
|
+ consumerCount = 1;
|
|
|
+ }
|
|
|
+ ParameterizedType pt = (ParameterizedType) this.getClass().getGenericSuperclass();
|
|
|
+ Class<C> clazz = (Class<C>) pt.getActualTypeArguments()[1];
|
|
|
+ consumers = new ArrayList<>();
|
|
|
+ this.basket = new Basket(consumerCount);
|
|
|
+ basket.setTotal(total);
|
|
|
+ // 启动消费者
|
|
|
+ int count = basket.getConsumerCount();
|
|
|
+ for (int i = 0; i < count; i++) {
|
|
|
+ Consumer<T> co = SpringContextHolder.getBean(clazz);
|
|
|
+ co.setBasket(basket);
|
|
|
+ co.start();
|
|
|
+ consumers.add((Consumer<T>) AopTargetUtils.getTarget(co));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void dispose(Map<String, Object> param) {
|
|
|
+ try {
|
|
|
+ // 生产数据
|
|
|
+ int index = 0;
|
|
|
+ for (;;) {
|
|
|
+ T dto = findData(param, index);
|
|
|
+ if (dto == null) {
|
|
|
+ // 拿不到数据,结束消费
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ offer(dto);
|
|
|
+ index++;
|
|
|
+ }
|
|
|
+ // 发送生产结束信息
|
|
|
+ endConsumer();
|
|
|
+ // 等待子线程结束
|
|
|
+ await();
|
|
|
+ // 判断子线程是否正常结束
|
|
|
+ if (basket.isExcuteError()) {
|
|
|
+ throw new StatusException("处理失败,线程异常");
|
|
|
+ }
|
|
|
+ } catch (StatusException e) {
|
|
|
+ LOG.error(e.getMessage(),e);
|
|
|
+ // 获取异常时发送异常结束信息
|
|
|
+ endConsumerAsError();
|
|
|
+ throw e;
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error(e.getMessage(),e);
|
|
|
+ // 获取异常时发送异常结束信息
|
|
|
+ endConsumerAsError();
|
|
|
+ throw new StatusException("处理失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 出异常后修改标识
|
|
|
+ *
|
|
|
+ */
|
|
|
+ private void endConsumerAsError() {
|
|
|
+ basket.setExcuteError(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 正常结束消费者
|
|
|
+ *
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ private void endConsumer() throws InterruptedException {
|
|
|
+ int count = basket.getConsumerCount();
|
|
|
+ EndObject eo = new EndObject();
|
|
|
+ for (int i = 0; i < count; i++) {
|
|
|
+ basket.offer(eo);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生产数据
|
|
|
+ *
|
|
|
+ * @param ob
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ private void offer(Object ob) throws InterruptedException {
|
|
|
+ synchronized (basket) {
|
|
|
+ basket.offer(ob);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 等待所有消费者结束
|
|
|
+ *
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ private void await() throws InterruptedException {
|
|
|
+ basket.await();
|
|
|
+ }
|
|
|
+
|
|
|
+ protected abstract T findData(Map<String, Object> param, int index);
|
|
|
+}
|