Producer.java 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package cn.com.qmth.am.multithread;
  2. import java.lang.reflect.ParameterizedType;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import java.util.Map;
  6. import java.util.concurrent.atomic.AtomicInteger;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import com.qmth.boot.core.exception.StatusException;
  10. import cn.com.qmth.am.utils.SpringContextHolder;
  11. public abstract class Producer<T, C extends Consumer<T>> {
  12. private static final Logger LOG = LoggerFactory.getLogger(Producer.class);
  13. private Basket<T> basket;
  14. /**
  15. * 消费线程class
  16. */
  17. private List<Consumer<T>> consumers;
  18. public void startDispose(int consumerCount) {
  19. startDispose(consumerCount, null, 0);
  20. }
  21. /**
  22. * 处理开始方法
  23. */
  24. public void startDispose(int consumerCount, Map<String, Object> param) {
  25. startDispose(consumerCount, param, 0);
  26. }
  27. public void startDispose(int consumerCount, int total) {
  28. startDispose(consumerCount, null, total);
  29. }
  30. public void startDispose(int consumerCount, Map<String, Object> param, int total) {
  31. // 启动消费者
  32. startConsumer(consumerCount, total);
  33. // 开始处理
  34. dispose(param);
  35. }
  36. @SuppressWarnings("unchecked")
  37. private void startConsumer(int consumerCount, int total) {
  38. if (consumerCount <= 0) {
  39. consumerCount = 1;
  40. }
  41. ParameterizedType pt = (ParameterizedType) this.getClass().getGenericSuperclass();
  42. Class<C> clazz = (Class<C>) pt.getActualTypeArguments()[1];
  43. consumers = new ArrayList<>();
  44. this.basket = new Basket<T>(consumerCount, getTaskName());
  45. basket.setTotal(total);
  46. // 启动消费者
  47. int count = basket.getConsumerCount();
  48. for (int i = 0; i < count; i++) {
  49. Consumer<T> co = SpringContextHolder.getBean(clazz);
  50. co.setBasket(basket);
  51. co.setConsumer(co);
  52. co.start();
  53. consumers.add((Consumer<T>) AopTargetUtils.getTarget(co));
  54. }
  55. }
  56. private void dispose(Map<String, Object> param) {
  57. try {
  58. // 生产数据
  59. int index = 0;
  60. for (;;) {
  61. T dto = findData(param, index);
  62. if (dto == null) {
  63. // 拿不到数据,结束消费
  64. break;
  65. }
  66. offer(dto);
  67. index++;
  68. }
  69. // 发送生产结束信息
  70. endConsumer();
  71. // 等待子线程结束
  72. await();
  73. // 判断子线程是否正常结束
  74. if (basket.isExcuteError()) {
  75. throw new StatusException("处理失败,线程异常");
  76. }
  77. } catch (StatusException e) {
  78. LOG.error(e.getMessage(), e);
  79. // 获取异常时发送异常结束信息
  80. endConsumerAsError();
  81. throw e;
  82. } catch (Exception e) {
  83. LOG.error(e.getMessage(), e);
  84. // 获取异常时发送异常结束信息
  85. endConsumerAsError();
  86. throw new StatusException("处理失败", e);
  87. }
  88. }
  89. /**
  90. * 出异常后修改标识
  91. *
  92. */
  93. private void endConsumerAsError() {
  94. basket.setExcuteError(true);
  95. }
  96. /**
  97. * 正常结束消费者
  98. *
  99. * @throws InterruptedException
  100. */
  101. private void endConsumer() throws InterruptedException {
  102. int count = basket.getConsumerCount();
  103. EndObject eo = new EndObject();
  104. for (int i = 0; i < count; i++) {
  105. basket.offer(eo);
  106. }
  107. }
  108. /**
  109. * 生产数据
  110. *
  111. * @param ob
  112. * @throws InterruptedException
  113. */
  114. private void offer(Object ob) throws InterruptedException {
  115. synchronized (basket) {
  116. basket.offer(ob);
  117. }
  118. }
  119. /**
  120. * 等待所有消费者结束
  121. *
  122. * @throws InterruptedException
  123. */
  124. private void await() throws InterruptedException {
  125. basket.await();
  126. }
  127. protected abstract T findData(Map<String, Object> param, int index);
  128. protected abstract String getTaskName();
  129. public List<String> getMsgs() {
  130. return this.basket.getMsgs();
  131. }
  132. public Integer getTotal() {
  133. return this.basket.getTotal();
  134. }
  135. public AtomicInteger getProcess() {
  136. return this.basket.getProcess();
  137. }
  138. public List<T> getFaildDto() {
  139. return this.basket.getFaildDto();
  140. }
  141. }