package cn.com.qmth.multithread; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; public class Basket { private static Logger logger = Logger.getLogger(Basket.class); /** * 数据阻塞队列 */ private BlockingQueue queue; /** * 多线程计数器,子线程都结束后主线程才继续执行 */ private CountDownLatch endGate; /** * 消费者数量 */ private int consumerCount; /** * 判断线程执行是否有出错,生产者、消费者出错都需要修改此值为true */ private boolean isExcuteError = false; public Basket(int consumerCount) { this.consumerCount=consumerCount; queue = new ArrayBlockingQueue(consumerCount*2); endGate = new CountDownLatch(consumerCount); } /** * 生产数据,不采用put方法防止消费线程全部异常后生产线程阻塞 * @param value * @throws InterruptedException */ protected void offer(final Object value) throws InterruptedException { if(isExcuteError) { logger.error("**********************offer isExcuteError threadId:"+Thread.currentThread().getId()); throw new StatusException("1000001","线程异常"); }else { boolean ret=queue.offer(value, 1, TimeUnit.MINUTES); if(!ret) { // logger.info("**********************offer time out threadId:"+Thread.currentThread().getId()+value); this.offer(value); } } } /** * 消费数据,不采用take方法防止生产线程全部异常后消费线程阻塞 * @return * @throws InterruptedException */ protected Object consume() throws InterruptedException { if(isExcuteError) { logger.error("**********************poll isExcuteError threadId:"+Thread.currentThread().getId()); return new EndObject(); }else { Object ob=queue.poll(1, TimeUnit.MINUTES); if(ob==null) { // logger.info("**********************poll time out threadId:"+Thread.currentThread().getId()); return this.consume(); }else { return ob; } } } protected void await() throws InterruptedException { endGate.await(); } protected void countDown() { endGate.countDown(); } protected boolean isExcuteError() { return isExcuteError; } protected void setExcuteError(boolean isExcuteError) { this.isExcuteError = isExcuteError; } protected int getConsumerCount() { return consumerCount; } protected void setConsumerCount(int consumerCount) { this.consumerCount = consumerCount; } }