Basket.java 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package cn.com.qmth.im.multithread;
  2. import java.util.List;
  3. import java.util.concurrent.ArrayBlockingQueue;
  4. import java.util.concurrent.BlockingQueue;
  5. import java.util.concurrent.CopyOnWriteArrayList;
  6. import java.util.concurrent.CountDownLatch;
  7. import java.util.concurrent.TimeUnit;
  8. import java.util.concurrent.atomic.AtomicInteger;
  9. import cn.com.qmth.im.bean.Calculator;
  10. import cn.com.qmth.im.bean.StatusException;
  11. public class Basket {
  12. private ProcessCount process;
  13. private String taskName;
  14. private Integer total = 0;
  15. private List<String> result = new CopyOnWriteArrayList<>();
  16. /**
  17. * 数据阻塞队列
  18. */
  19. private BlockingQueue<Object> queue;
  20. /**
  21. * 多线程计数器,子线程都结束后主线程才继续执行
  22. */
  23. private CountDownLatch endGate;
  24. /**
  25. * 消费者数量
  26. */
  27. private int consumerCount;
  28. /**
  29. * 判断线程执行是否有出错,生产者、消费者出错都需要修改此值为true
  30. */
  31. private boolean isExcuteError = false;
  32. public Basket(int consumerCount, String taskName) {
  33. this.consumerCount = consumerCount;
  34. this.taskName = taskName;
  35. queue = new ArrayBlockingQueue<Object>(consumerCount * 2);
  36. endGate = new CountDownLatch(consumerCount);
  37. }
  38. /**
  39. * 生产数据,不采用put方法防止消费线程全部异常后生产线程阻塞
  40. *
  41. * @param value
  42. * @throws InterruptedException
  43. */
  44. protected void offer(final Object value) throws InterruptedException {
  45. if (isExcuteError) {
  46. throw new StatusException("线程异常");
  47. } else {
  48. boolean ret = queue.offer(value, 5, TimeUnit.SECONDS);
  49. if (!ret) {
  50. this.offer(value);
  51. }
  52. }
  53. }
  54. /**
  55. * 消费数据,不采用take方法防止生产线程全部异常后消费线程阻塞
  56. *
  57. * @return
  58. * @throws InterruptedException
  59. */
  60. protected Object consume() throws InterruptedException {
  61. if (isExcuteError) {
  62. return new EndObject();
  63. } else {
  64. Object ob = queue.poll(5, TimeUnit.SECONDS);
  65. if (ob == null) {
  66. return this.consume();
  67. } else {
  68. return ob;
  69. }
  70. }
  71. }
  72. protected void endGateReset() {
  73. endGate = new CountDownLatch(consumerCount);
  74. }
  75. protected void await() throws InterruptedException {
  76. endGate.await();
  77. }
  78. protected void countDown() {
  79. endGate.countDown();
  80. }
  81. protected boolean isExcuteError() {
  82. return isExcuteError;
  83. }
  84. protected void setExcuteError(boolean isExcuteError) {
  85. this.isExcuteError = isExcuteError;
  86. }
  87. protected int getConsumerCount() {
  88. return consumerCount;
  89. }
  90. protected void setConsumerCount(int consumerCount) {
  91. this.consumerCount = consumerCount;
  92. }
  93. public Integer getTotal() {
  94. return total;
  95. }
  96. protected void setTotal(Integer total) {
  97. this.total = total;
  98. }
  99. public AtomicInteger getProcess() {
  100. return process.getProcess();
  101. }
  102. protected void updateProcess(int add) {
  103. process.getProcess().addAndGet(add);
  104. }
  105. public String getProgress() {
  106. if (total == 0) {
  107. return "0%";
  108. }
  109. Double d = Calculator.divide(process.getProcess().doubleValue(), total.doubleValue(), 4);
  110. Double f = Calculator.multiply(d, 100);
  111. return f + "%";
  112. }
  113. public List<String> getMsgs() {
  114. return result;
  115. }
  116. public void addMsg(String msg) {
  117. result.add(msg);
  118. }
  119. public String getTaskName() {
  120. return taskName;
  121. }
  122. protected void setProcess(ProcessCount process) {
  123. this.process = process;
  124. }
  125. }