xiatian 2 жил өмнө
parent
commit
e3364c7b6c

+ 0 - 54
src/main/java/cn/com/qmth/scancloud/tools/multithread/BatchConsumer.java

@@ -1,54 +0,0 @@
-package cn.com.qmth.scancloud.tools.multithread;
-
-import java.util.concurrent.locks.LockSupport;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class BatchConsumer<T>  extends Thread{
-	private static final Logger LOG = LoggerFactory.getLogger(BatchConsumer.class);
-	private Basket basket;
-	
-	public BatchConsumer() {
-	}
-	@SuppressWarnings("unchecked")
-	@Override
-	public void run() {
-		try {
-			for (;;) {
-				//先判断是否有异常结束
-				if(basket.isExcuteError()) {
-					break;
-				}
-				//取消费数据
-				Object o= basket.consume();
-				//判断消费数据是否是结束
-				if(o instanceof EndObject) {
-					basket.countDown();
-					break;
-				}
-				if(o instanceof ParkObject) {
-					basket.countDown();
-					LockSupport.park();
-				}else {
-					T t=(T)o;
-					//消费数据实现
-					consume(t);
-				}
-			}
-		} catch (Exception e) {
-			basket.setExcuteError(true);
-			LOG.error("消费线程处理出错",e);
-		} finally {
-			basket.countDown();
-		}
-	}
-	public abstract void consume(T t);
-	protected Basket getBasket() {
-		return basket;
-	}
-	protected void setBasket(Basket basket) {
-		this.basket = basket;
-	}
-	
-}

+ 0 - 170
src/main/java/cn/com/qmth/scancloud/tools/multithread/BatchProducer.java

@@ -1,170 +0,0 @@
-package cn.com.qmth.scancloud.tools.multithread;
-
-import java.lang.reflect.ParameterizedType;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.LockSupport;
-
-import org.apache.commons.collections4.CollectionUtils;
-
-import cn.com.qmth.scancloud.tools.utils.SpringContextHolder;
-import cn.com.qmth.scancloud.tools.utils.StatusException;
-
-/**
- * @param <T> 生产消费对象
- * @param <C> 消费线程类
- */
-public abstract class BatchProducer<T, C extends BatchConsumer<T>> {
-
-    private List<BatchConsumer<T>> consumers;
-
-    private Basket basket;
-
-    public void startDispose(int consumerCount) throws InterruptedException {
-        startDispose(consumerCount, null);
-    }
-
-    /**
-     * 处理开始方法
-     *
-     * @param consumerCount 消费线程数
-     * @param param         生产者业务参数
-     */
-    public void startDispose(int consumerCount, Map<String, Object> param) throws InterruptedException {
-        // 启动消费者
-        try {
-            startConsumer(consumerCount);
-            for (; ; ) {
-                List<T> dtos = findOneBatchData(param);
-                if (CollectionUtils.isEmpty(dtos)) {
-                    // 拿不到数据,结束消费
-                    endConsumer();
-                    break;
-                }
-                // 开始处理一轮
-                disposeBatch(dtos);
-                //重置计数器
-                basket.endGateReset();
-                // 唤醒消费者
-                unParkConsumer();
-            }
-        } catch (StatusException e) {
-            // 获取异常时发送异常结束信息
-            endConsumerAsError();
-            throw e;
-        } catch (Exception e) {
-            // 获取异常时发送异常结束信息
-            endConsumerAsError();
-            throw new StatusException("处理失败", e);
-        }
-    }
-
-    /**
-     * 获取一个批次数据,都被消费处理完毕后才再取下一批
-     *
-     * @param param
-     * @return
-     */
-    public abstract List<T> findOneBatchData(Map<String, Object> param);
-
-    @SuppressWarnings("unchecked")
-    private void startConsumer(int consumerCount) {
-        if (consumerCount <= 0) {
-            consumerCount = 1;
-        }
-        ParameterizedType pt = (ParameterizedType) this.getClass().getGenericSuperclass();
-        Class<C> clazz = (Class<C>) pt.getActualTypeArguments()[1];
-        consumers = new ArrayList<>();
-        this.basket = new Basket(consumerCount);
-        // 启动消费者
-        int count = basket.getConsumerCount();
-        for (int i = 0; i < count; i++) {
-            BatchConsumer<T> co = SpringContextHolder.getBean(clazz);
-            co.setBasket(basket);
-            co.start();
-            consumers.add((BatchConsumer<T>) AopTargetUtils.getTarget(co));
-        }
-    }
-
-    private void disposeBatch(List<T> dtos) throws InterruptedException {
-        // 生产数据
-        for (T t : dtos) {
-            offer(t);
-        }
-        // 发送一轮结束信息
-        parkConsumer();
-        // 等待子线程结束
-        await();
-        // 判断子线程是否正常结束
-        if (basket.isExcuteError()) {
-            throw new StatusException("处理失败,线程异常");
-        }
-    }
-
-    /**
-     * 出异常后修改标识
-     */
-    private void endConsumerAsError() {
-        basket.setExcuteError(true);
-    }
-
-    /**
-     * 正常结束消费者
-     *
-     * @throws InterruptedException
-     */
-    private void endConsumer() throws InterruptedException {
-        int count = basket.getConsumerCount();
-        EndObject eo = new EndObject();
-        for (int i = 0; i < count; i++) {
-            basket.offer(eo);
-        }
-
-    }
-
-    /**
-     * 正常暂停消费者
-     *
-     * @throws InterruptedException
-     */
-    private void parkConsumer() throws InterruptedException {
-        int count = basket.getConsumerCount();
-        ParkObject eo = new ParkObject();
-        for (int i = 0; i < count; i++) {
-            basket.offer(eo);
-        }
-
-    }
-
-    /**
-     * 正常唤醒消费者
-     *
-     * @throws InterruptedException
-     */
-    private void unParkConsumer() {
-        for (BatchConsumer<T> c : consumers) {
-            LockSupport.unpark(c);
-        }
-    }
-
-    /**
-     * 生产数据
-     *
-     * @param ob
-     * @throws InterruptedException
-     */
-    private void offer(Object ob) throws InterruptedException {
-        basket.offer(ob);
-    }
-
-    /**
-     * 等待所有消费者结束
-     *
-     * @throws InterruptedException
-     */
-    private void await() throws InterruptedException {
-        basket.await();
-    }
-
-}

+ 0 - 10
src/main/java/cn/com/qmth/scancloud/tools/multithread/ParkObject.java

@@ -1,10 +0,0 @@
-package cn.com.qmth.scancloud.tools.multithread;
-
-/**
- * 消费一轮完毕标识对象
- * @author xiatian
- *
- */
-public class ParkObject {
-
-}