package cn.com.qmth.im.multithread; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import cn.com.qmth.im.bean.Calculator; import cn.com.qmth.im.bean.StatusException; public class Basket { private ProcessCount process; private String taskName; private Integer total = 0; private List result = new CopyOnWriteArrayList<>(); /** * 数据阻塞队列 */ private BlockingQueue queue; /** * 多线程计数器,子线程都结束后主线程才继续执行 */ private CountDownLatch endGate; /** * 消费者数量 */ private int consumerCount; /** * 判断线程执行是否有出错,生产者、消费者出错都需要修改此值为true */ private boolean isExcuteError = false; public Basket(int consumerCount, String taskName) { this.consumerCount = consumerCount; this.taskName = taskName; queue = new ArrayBlockingQueue(consumerCount * 2); endGate = new CountDownLatch(consumerCount); } /** * 生产数据,不采用put方法防止消费线程全部异常后生产线程阻塞 * * @param value * @throws InterruptedException */ protected void offer(final Object value) throws InterruptedException { if (isExcuteError) { throw new StatusException("线程异常"); } else { boolean ret = queue.offer(value, 5, TimeUnit.SECONDS); if (!ret) { this.offer(value); } } } /** * 消费数据,不采用take方法防止生产线程全部异常后消费线程阻塞 * * @return * @throws InterruptedException */ protected Object consume() throws InterruptedException { if (isExcuteError) { return new EndObject(); } else { Object ob = queue.poll(5, TimeUnit.SECONDS); if (ob == null) { return this.consume(); } else { return ob; } } } protected void endGateReset() { endGate = new CountDownLatch(consumerCount); } protected void await() throws InterruptedException { endGate.await(); } protected void countDown() { endGate.countDown(); } protected boolean isExcuteError() { return isExcuteError; } protected void setExcuteError(boolean isExcuteError) { this.isExcuteError = isExcuteError; } protected int getConsumerCount() { return consumerCount; } protected void setConsumerCount(int consumerCount) { this.consumerCount = consumerCount; } public Integer getTotal() { return total; } protected void setTotal(Integer total) { this.total = total; } public AtomicInteger getProcess() { return process.getProcess(); } protected void updateProcess(int add) { process.getProcess().addAndGet(add); } public String getProgress() { if (total == 0) { return "0%"; } Double d = Calculator.divide(process.getProcess().doubleValue(), total.doubleValue(), 4); Double f = Calculator.multiply(d, 100); return f + "%"; } public List getMsgs() { return result; } public void addMsg(String msg) { result.add(msg); } public String getTaskName() { return taskName; } protected void setProcess(ProcessCount process) { this.process = process; } }