Producer.java 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package cn.com.qmth.im.multithread;
  2. import java.lang.reflect.ParameterizedType;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import java.util.Map;
  6. import java.util.concurrent.atomic.AtomicInteger;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import cn.com.qmth.im.bean.StatusException;
  10. public abstract class Producer<T, C extends Consumer<T>> {
  11. private static final Logger LOG = LoggerFactory.getLogger(Producer.class);
  12. private Basket basket;
  13. /**
  14. * 消费线程class
  15. */
  16. private List<C> consumers;
  17. public void startDispose(int consumerCount) {
  18. startDispose(consumerCount, null, 0);
  19. }
  20. /**
  21. * 处理开始方法
  22. */
  23. public void startDispose(int consumerCount, Map<String, Object> param) {
  24. startDispose(consumerCount, param, 0);
  25. }
  26. public void startDispose(int consumerCount, int total) {
  27. startDispose(consumerCount, null, total);
  28. }
  29. public void startDispose(int consumerCount, Map<String, Object> param, int total) {
  30. // 启动消费者
  31. startConsumer(consumerCount, total, null);
  32. // 开始处理
  33. dispose(param);
  34. }
  35. public void startDispose(int consumerCount, Map<String, Object> param, int total, ProcessCount pc) {
  36. // 启动消费者
  37. startConsumer(consumerCount, total, pc);
  38. // 开始处理
  39. dispose(param);
  40. }
  41. @SuppressWarnings("unchecked")
  42. private void startConsumer(int consumerCount, int total, ProcessCount pc) {
  43. if (consumerCount <= 0) {
  44. consumerCount = 1;
  45. }
  46. ParameterizedType pt = (ParameterizedType) this.getClass().getGenericSuperclass();
  47. Class<C> clazz = (Class<C>) pt.getActualTypeArguments()[1];
  48. consumers = new ArrayList<>();
  49. this.basket = new Basket(consumerCount, getTaskName());
  50. basket.setTotal(total);
  51. if (pc == null) {
  52. basket.setProcess(new ProcessCount());
  53. } else {
  54. basket.setProcess(pc);
  55. }
  56. // 启动消费者
  57. int count = basket.getConsumerCount();
  58. try {
  59. for (int i = 0; i < count; i++) {
  60. C co = (C) clazz.newInstance();
  61. co.setBasket(basket);
  62. co.start();
  63. consumers.add(co);
  64. }
  65. } catch (Exception e) {
  66. throw new RuntimeException(e);
  67. }
  68. }
  69. private void dispose(Map<String, Object> param) {
  70. try {
  71. // 生产数据
  72. produce(param);
  73. // 发送生产结束信息
  74. endConsumer();
  75. // 等待子线程结束
  76. await();
  77. // 判断子线程是否正常结束
  78. if (basket.isExcuteError()) {
  79. throw new StatusException("处理失败,线程异常");
  80. }
  81. } catch (StatusException e) {
  82. LOG.error(e.getMessage(), e);
  83. // 获取异常时发送异常结束信息
  84. endConsumerAsError();
  85. throw e;
  86. } catch (Exception e) {
  87. LOG.error(e.getMessage(), e);
  88. // 获取异常时发送异常结束信息
  89. endConsumerAsError();
  90. throw new StatusException("处理失败", e);
  91. }
  92. }
  93. protected abstract void produce(Map<String, Object> param) throws Exception;
  94. /**
  95. * 生产数据
  96. *
  97. * @param ob
  98. * @throws InterruptedException
  99. */
  100. protected void offer(T ob) throws InterruptedException {
  101. synchronized (basket) {
  102. basket.offer(ob);
  103. }
  104. }
  105. /**
  106. * 出异常后修改标识
  107. *
  108. */
  109. private void endConsumerAsError() {
  110. basket.setExcuteError(true);
  111. }
  112. /**
  113. * 正常结束消费者
  114. *
  115. * @throws InterruptedException
  116. */
  117. private void endConsumer() throws InterruptedException {
  118. int count = basket.getConsumerCount();
  119. EndObject eo = new EndObject();
  120. for (int i = 0; i < count; i++) {
  121. basket.offer(eo);
  122. }
  123. }
  124. /**
  125. * 等待所有消费者结束
  126. *
  127. * @throws InterruptedException
  128. */
  129. private void await() throws InterruptedException {
  130. basket.await();
  131. }
  132. protected abstract String getTaskName();
  133. public List<String> getMsgs() {
  134. return this.basket.getMsgs();
  135. }
  136. public void setTotal(int total) {
  137. this.basket.setTotal(total);
  138. }
  139. public Integer getTotal() {
  140. return this.basket.getTotal();
  141. }
  142. public AtomicInteger getProcess() {
  143. return this.basket.getProcess();
  144. }
  145. public List<C> getConsumers() {
  146. return consumers;
  147. }
  148. }