Basket.java 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package cn.com.qmth.am.multithread;
  2. import java.util.List;
  3. import java.util.concurrent.ArrayBlockingQueue;
  4. import java.util.concurrent.BlockingQueue;
  5. import java.util.concurrent.CopyOnWriteArrayList;
  6. import java.util.concurrent.CountDownLatch;
  7. import java.util.concurrent.TimeUnit;
  8. import java.util.concurrent.atomic.AtomicInteger;
  9. import com.qmth.boot.core.exception.StatusException;
  10. import cn.com.qmth.am.utils.Calculator;
  11. public class Basket<T> {
  12. private String taskName;
  13. private Integer total = 0;
  14. private AtomicInteger process = new AtomicInteger(0);
  15. private List<String> result = new CopyOnWriteArrayList<>();
  16. private List<T> failed = new CopyOnWriteArrayList<>();
  17. /**
  18. * 数据阻塞队列
  19. */
  20. private BlockingQueue<Object> queue;
  21. /**
  22. * 多线程计数器,子线程都结束后主线程才继续执行
  23. */
  24. private CountDownLatch endGate;
  25. /**
  26. * 消费者数量
  27. */
  28. private int consumerCount;
  29. /**
  30. * 判断线程执行是否有出错,生产者、消费者出错都需要修改此值为true
  31. */
  32. private boolean isExcuteError = false;
  33. public Basket(int consumerCount, String taskName) {
  34. this.consumerCount = consumerCount;
  35. this.taskName = taskName;
  36. queue = new ArrayBlockingQueue<Object>(consumerCount * 2);
  37. endGate = new CountDownLatch(consumerCount);
  38. }
  39. /**
  40. * 生产数据,不采用put方法防止消费线程全部异常后生产线程阻塞
  41. *
  42. * @param value
  43. * @throws InterruptedException
  44. */
  45. protected void offer(final Object value) throws InterruptedException {
  46. if (isExcuteError) {
  47. throw new StatusException("线程异常");
  48. } else {
  49. boolean ret = queue.offer(value, 5, TimeUnit.SECONDS);
  50. if (!ret) {
  51. this.offer(value);
  52. }
  53. }
  54. }
  55. /**
  56. * 消费数据,不采用take方法防止生产线程全部异常后消费线程阻塞
  57. *
  58. * @return
  59. * @throws InterruptedException
  60. */
  61. protected Object consume() throws InterruptedException {
  62. if (isExcuteError) {
  63. return new EndObject();
  64. } else {
  65. Object ob = queue.poll(5, TimeUnit.SECONDS);
  66. if (ob == null) {
  67. return this.consume();
  68. } else {
  69. return ob;
  70. }
  71. }
  72. }
  73. protected void endGateReset() {
  74. endGate = new CountDownLatch(consumerCount);
  75. }
  76. protected void await() throws InterruptedException {
  77. endGate.await();
  78. }
  79. protected void countDown() {
  80. endGate.countDown();
  81. }
  82. protected boolean isExcuteError() {
  83. return isExcuteError;
  84. }
  85. protected void setExcuteError(boolean isExcuteError) {
  86. this.isExcuteError = isExcuteError;
  87. }
  88. protected int getConsumerCount() {
  89. return consumerCount;
  90. }
  91. protected void setConsumerCount(int consumerCount) {
  92. this.consumerCount = consumerCount;
  93. }
  94. public Integer getTotal() {
  95. return total;
  96. }
  97. protected void setTotal(Integer total) {
  98. this.total = total;
  99. }
  100. public AtomicInteger getProcess() {
  101. return process;
  102. }
  103. protected void updateProcess(int add) {
  104. process.addAndGet(add);
  105. }
  106. public String getProgress() {
  107. if (total == 0) {
  108. return "0%";
  109. }
  110. Double d = Calculator.divide(process.doubleValue(), total.doubleValue(), 4);
  111. Double f = Calculator.multiply(d, 100);
  112. return f + "%";
  113. }
  114. public List<String> getMsgs() {
  115. return result;
  116. }
  117. public void addMsg(String msg) {
  118. result.add(msg);
  119. }
  120. public String getTaskName() {
  121. return taskName;
  122. }
  123. public void addFailDto(T t) {
  124. failed.add(t);
  125. }
  126. public List<T> getFaildDto() {
  127. return failed;
  128. }
  129. }