package cn.com.qmth.am.multithread; import java.lang.reflect.ParameterizedType; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.qmth.boot.core.exception.StatusException; import cn.com.qmth.am.utils.SpringContextHolder; public abstract class Producer> { private static final Logger LOG = LoggerFactory.getLogger(Producer.class); private Basket basket; /** * 消费线程class */ private List> consumers; public void startDispose(int consumerCount) { startDispose(consumerCount, null, 0); } /** * 处理开始方法 */ public void startDispose(int consumerCount, Map param) { startDispose(consumerCount, param, 0); } public void startDispose(int consumerCount, int total) { startDispose(consumerCount, null, total); } public void startDispose(int consumerCount, Map param, int total) { // 启动消费者 startConsumer(consumerCount, total); // 开始处理 dispose(param); } @SuppressWarnings("unchecked") private void startConsumer(int consumerCount, int total) { if (consumerCount <= 0) { consumerCount = 1; } ParameterizedType pt = (ParameterizedType) this.getClass().getGenericSuperclass(); Class clazz = (Class) pt.getActualTypeArguments()[1]; consumers = new ArrayList<>(); this.basket = new Basket(consumerCount, getTaskName()); basket.setTotal(total); // 启动消费者 int count = basket.getConsumerCount(); for (int i = 0; i < count; i++) { Consumer co = SpringContextHolder.getBean(clazz); co.setBasket(basket); co.setConsumer(co); co.start(); consumers.add((Consumer) AopTargetUtils.getTarget(co)); } } private void dispose(Map param) { try { // 生产数据 int index = 0; for (;;) { T dto = findData(param, index); if (dto == null) { // 拿不到数据,结束消费 break; } offer(dto); index++; } // 发送生产结束信息 endConsumer(); // 等待子线程结束 await(); // 判断子线程是否正常结束 if (basket.isExcuteError()) { throw new StatusException("处理失败,线程异常"); } } catch (StatusException e) { LOG.error(e.getMessage(), e); // 获取异常时发送异常结束信息 endConsumerAsError(); throw e; } catch (Exception e) { LOG.error(e.getMessage(), e); // 获取异常时发送异常结束信息 endConsumerAsError(); throw new StatusException("处理失败", e); } } /** * 出异常后修改标识 * */ private void endConsumerAsError() { basket.setExcuteError(true); } /** * 正常结束消费者 * * @throws InterruptedException */ private void endConsumer() throws InterruptedException { int count = basket.getConsumerCount(); EndObject eo = new EndObject(); for (int i = 0; i < count; i++) { basket.offer(eo); } } /** * 生产数据 * * @param ob * @throws InterruptedException */ private void offer(Object ob) throws InterruptedException { synchronized (basket) { basket.offer(ob); } } /** * 等待所有消费者结束 * * @throws InterruptedException */ private void await() throws InterruptedException { basket.await(); } protected abstract T findData(Map param, int index); protected abstract String getTaskName(); public List getMsgs() { return this.basket.getMsgs(); } public Integer getTotal() { return this.basket.getTotal(); } public AtomicInteger getProcess() { return this.basket.getProcess(); } public List getFaildDto() { return this.basket.getFaildDto(); } }