Basket.java 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package cn.com.qmth.multithread;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.CountDownLatch;
  5. import java.util.concurrent.TimeUnit;
  6. import org.apache.log4j.Logger;
  7. public class Basket {
  8. private static Logger logger = Logger.getLogger(Basket.class);
  9. /**
  10. * 数据阻塞队列
  11. */
  12. private BlockingQueue<Object> queue;
  13. /**
  14. * 多线程计数器,子线程都结束后主线程才继续执行
  15. */
  16. private CountDownLatch endGate;
  17. /**
  18. * 消费者数量
  19. */
  20. private int consumerCount;
  21. /**
  22. * 判断线程执行是否有出错,生产者、消费者出错都需要修改此值为true
  23. */
  24. private boolean isExcuteError = false;
  25. public Basket(int consumerCount) {
  26. this.consumerCount=consumerCount;
  27. queue = new ArrayBlockingQueue<Object>(consumerCount*2);
  28. endGate = new CountDownLatch(consumerCount);
  29. }
  30. /**
  31. * 生产数据,不采用put方法防止消费线程全部异常后生产线程阻塞
  32. * @param value
  33. * @throws InterruptedException
  34. */
  35. protected void offer(final Object value) throws InterruptedException {
  36. if(isExcuteError) {
  37. logger.error("**********************offer isExcuteError threadId:"+Thread.currentThread().getId());
  38. throw new StatusException("1000001","线程异常");
  39. }else {
  40. boolean ret=queue.offer(value, 1, TimeUnit.MINUTES);
  41. if(!ret) {
  42. // logger.info("**********************offer time out threadId:"+Thread.currentThread().getId()+value);
  43. this.offer(value);
  44. }
  45. }
  46. }
  47. /**
  48. * 消费数据,不采用take方法防止生产线程全部异常后消费线程阻塞
  49. * @return
  50. * @throws InterruptedException
  51. */
  52. protected Object consume() throws InterruptedException {
  53. if(isExcuteError) {
  54. logger.error("**********************poll isExcuteError threadId:"+Thread.currentThread().getId());
  55. return new EndObject();
  56. }else {
  57. Object ob=queue.poll(1, TimeUnit.MINUTES);
  58. if(ob==null) {
  59. // logger.info("**********************poll time out threadId:"+Thread.currentThread().getId());
  60. return this.consume();
  61. }else {
  62. return ob;
  63. }
  64. }
  65. }
  66. protected void await() throws InterruptedException {
  67. endGate.await();
  68. }
  69. protected void countDown() {
  70. endGate.countDown();
  71. }
  72. protected boolean isExcuteError() {
  73. return isExcuteError;
  74. }
  75. protected void setExcuteError(boolean isExcuteError) {
  76. this.isExcuteError = isExcuteError;
  77. }
  78. protected int getConsumerCount() {
  79. return consumerCount;
  80. }
  81. protected void setConsumerCount(int consumerCount) {
  82. this.consumerCount = consumerCount;
  83. }
  84. }