123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- package cn.com.qmth.multithread;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.TimeUnit;
- import org.apache.log4j.Logger;
- public class Basket {
- private static Logger logger = Logger.getLogger(Basket.class);
- /**
- * 数据阻塞队列
- */
- private BlockingQueue<Object> queue;
-
- /**
- * 多线程计数器,子线程都结束后主线程才继续执行
- */
- private CountDownLatch endGate;
-
- /**
- * 消费者数量
- */
- private int consumerCount;
-
-
- /**
- * 判断线程执行是否有出错,生产者、消费者出错都需要修改此值为true
- */
- private boolean isExcuteError = false;
-
-
- public Basket(int consumerCount) {
- this.consumerCount=consumerCount;
- queue = new ArrayBlockingQueue<Object>(consumerCount*2);
- endGate = new CountDownLatch(consumerCount);
- }
- /**
- * 生产数据,不采用put方法防止消费线程全部异常后生产线程阻塞
- * @param value
- * @throws InterruptedException
- */
- protected void offer(final Object value) throws InterruptedException {
- if(isExcuteError) {
- logger.error("**********************offer isExcuteError threadId:"+Thread.currentThread().getId());
- throw new StatusException("1000001","线程异常");
- }else {
- boolean ret=queue.offer(value, 1, TimeUnit.MINUTES);
- if(!ret) {
- // logger.info("**********************offer time out threadId:"+Thread.currentThread().getId()+value);
- this.offer(value);
- }
- }
- }
- /**
- * 消费数据,不采用take方法防止生产线程全部异常后消费线程阻塞
- * @return
- * @throws InterruptedException
- */
- protected Object consume() throws InterruptedException {
- if(isExcuteError) {
- logger.error("**********************poll isExcuteError threadId:"+Thread.currentThread().getId());
- return new EndObject();
- }else {
- Object ob=queue.poll(1, TimeUnit.MINUTES);
- if(ob==null) {
- // logger.info("**********************poll time out threadId:"+Thread.currentThread().getId());
- return this.consume();
- }else {
- return ob;
- }
- }
- }
-
- protected void await() throws InterruptedException {
- endGate.await();
- }
- protected void countDown() {
- endGate.countDown();
- }
- protected boolean isExcuteError() {
- return isExcuteError;
- }
- protected void setExcuteError(boolean isExcuteError) {
- this.isExcuteError = isExcuteError;
- }
- protected int getConsumerCount() {
- return consumerCount;
- }
- protected void setConsumerCount(int consumerCount) {
- this.consumerCount = consumerCount;
- }
- }
|