123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- package cn.com.qmth.am.multithread;
- import java.util.List;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.CopyOnWriteArrayList;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicInteger;
- import com.qmth.boot.core.exception.StatusException;
- import cn.com.qmth.am.utils.Calculator;
- public class Basket<T> {
- private String taskName;
- private Integer total = 0;
- private AtomicInteger process = new AtomicInteger(0);
- private List<String> result = new CopyOnWriteArrayList<>();
- private List<T> failed = new CopyOnWriteArrayList<>();
- /**
- * 数据阻塞队列
- */
- private BlockingQueue<Object> queue;
- /**
- * 多线程计数器,子线程都结束后主线程才继续执行
- */
- private CountDownLatch endGate;
- /**
- * 消费者数量
- */
- private int consumerCount;
- /**
- * 判断线程执行是否有出错,生产者、消费者出错都需要修改此值为true
- */
- private boolean isExcuteError = false;
- public Basket(int consumerCount, String taskName) {
- this.consumerCount = consumerCount;
- this.taskName = taskName;
- queue = new ArrayBlockingQueue<Object>(consumerCount * 2);
- endGate = new CountDownLatch(consumerCount);
- }
- /**
- * 生产数据,不采用put方法防止消费线程全部异常后生产线程阻塞
- *
- * @param value
- * @throws InterruptedException
- */
- protected void offer(final Object value) throws InterruptedException {
- if (isExcuteError) {
- throw new StatusException("线程异常");
- } else {
- boolean ret = queue.offer(value, 5, TimeUnit.SECONDS);
- if (!ret) {
- this.offer(value);
- }
- }
- }
- /**
- * 消费数据,不采用take方法防止生产线程全部异常后消费线程阻塞
- *
- * @return
- * @throws InterruptedException
- */
- protected Object consume() throws InterruptedException {
- if (isExcuteError) {
- return new EndObject();
- } else {
- Object ob = queue.poll(5, TimeUnit.SECONDS);
- if (ob == null) {
- return this.consume();
- } else {
- return ob;
- }
- }
- }
- protected void endGateReset() {
- endGate = new CountDownLatch(consumerCount);
- }
- protected void await() throws InterruptedException {
- endGate.await();
- }
- protected void countDown() {
- endGate.countDown();
- }
- protected boolean isExcuteError() {
- return isExcuteError;
- }
- protected void setExcuteError(boolean isExcuteError) {
- this.isExcuteError = isExcuteError;
- }
- protected int getConsumerCount() {
- return consumerCount;
- }
- protected void setConsumerCount(int consumerCount) {
- this.consumerCount = consumerCount;
- }
- public Integer getTotal() {
- return total;
- }
- protected void setTotal(Integer total) {
- this.total = total;
- }
- public AtomicInteger getProcess() {
- return process;
- }
- protected void updateProcess(int add) {
- process.addAndGet(add);
- }
- public String getProgress() {
- if (total == 0) {
- return "0%";
- }
- Double d = Calculator.divide(process.doubleValue(), total.doubleValue(), 4);
- Double f = Calculator.multiply(d, 100);
- return f + "%";
- }
- public List<String> getMsgs() {
- return result;
- }
- public void addMsg(String msg) {
- result.add(msg);
- }
- public String getTaskName() {
- return taskName;
- }
- public void addFailDto(T t) {
- failed.add(t);
- }
- public List<T> getFaildDto() {
- return failed;
- }
- }
|