123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- package cn.com.qmth.am.multithread;
- import java.lang.reflect.ParameterizedType;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.atomic.AtomicInteger;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import com.qmth.boot.core.exception.StatusException;
- import cn.com.qmth.am.utils.SpringContextHolder;
- public abstract class Producer<T, C extends Consumer<T>> {
- private static final Logger LOG = LoggerFactory.getLogger(Producer.class);
- private Basket<T> basket;
- /**
- * 消费线程class
- */
- private List<Consumer<T>> consumers;
- public void startDispose(int consumerCount) {
- startDispose(consumerCount, null, 0);
- }
- /**
- * 处理开始方法
- */
- public void startDispose(int consumerCount, Map<String, Object> param) {
- startDispose(consumerCount, param, 0);
- }
- public void startDispose(int consumerCount, int total) {
- startDispose(consumerCount, null, total);
- }
- public void startDispose(int consumerCount, Map<String, Object> param, int total) {
- // 启动消费者
- startConsumer(consumerCount, total);
- // 开始处理
- dispose(param);
- }
- @SuppressWarnings("unchecked")
- private void startConsumer(int consumerCount, int total) {
- if (consumerCount <= 0) {
- consumerCount = 1;
- }
- ParameterizedType pt = (ParameterizedType) this.getClass().getGenericSuperclass();
- Class<C> clazz = (Class<C>) pt.getActualTypeArguments()[1];
- consumers = new ArrayList<>();
- this.basket = new Basket<T>(consumerCount, getTaskName());
- basket.setTotal(total);
- // 启动消费者
- int count = basket.getConsumerCount();
- for (int i = 0; i < count; i++) {
- Consumer<T> co = SpringContextHolder.getBean(clazz);
- co.setBasket(basket);
- co.setConsumer(co);
- co.start();
- consumers.add((Consumer<T>) AopTargetUtils.getTarget(co));
- }
- }
- private void dispose(Map<String, Object> param) {
- try {
- // 生产数据
- int index = 0;
- for (;;) {
- T dto = findData(param, index);
- if (dto == null) {
- // 拿不到数据,结束消费
- break;
- }
- offer(dto);
- index++;
- }
- // 发送生产结束信息
- endConsumer();
- // 等待子线程结束
- await();
- // 判断子线程是否正常结束
- if (basket.isExcuteError()) {
- throw new StatusException("处理失败,线程异常");
- }
- } catch (StatusException e) {
- LOG.error(e.getMessage(), e);
- // 获取异常时发送异常结束信息
- endConsumerAsError();
- throw e;
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- // 获取异常时发送异常结束信息
- endConsumerAsError();
- throw new StatusException("处理失败", e);
- }
- }
- /**
- * 出异常后修改标识
- *
- */
- private void endConsumerAsError() {
- basket.setExcuteError(true);
- }
- /**
- * 正常结束消费者
- *
- * @throws InterruptedException
- */
- private void endConsumer() throws InterruptedException {
- int count = basket.getConsumerCount();
- EndObject eo = new EndObject();
- for (int i = 0; i < count; i++) {
- basket.offer(eo);
- }
- }
- /**
- * 生产数据
- *
- * @param ob
- * @throws InterruptedException
- */
- private void offer(Object ob) throws InterruptedException {
- synchronized (basket) {
- basket.offer(ob);
- }
- }
- /**
- * 等待所有消费者结束
- *
- * @throws InterruptedException
- */
- private void await() throws InterruptedException {
- basket.await();
- }
- protected abstract T findData(Map<String, Object> param, int index);
- protected abstract String getTaskName();
- public List<String> getMsgs() {
- return this.basket.getMsgs();
- }
- public Integer getTotal() {
- return this.basket.getTotal();
- }
- public AtomicInteger getProcess() {
- return this.basket.getProcess();
- }
- public List<T> getFaildDto() {
- return this.basket.getFaildDto();
- }
- }
|