|
@@ -1,138 +1,174 @@
|
|
|
package cn.com.qmth.multithread;
|
|
|
|
|
|
+import java.lang.reflect.ParameterizedType;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
-import org.apache.log4j.Logger;
|
|
|
-import org.apache.logging.log4j.ThreadContext;
|
|
|
-
|
|
|
-public abstract class Producer {
|
|
|
- private static final Logger logger = Logger.getLogger(Producer.class);
|
|
|
- private Basket basket;
|
|
|
-
|
|
|
- /**
|
|
|
- * 业务参数
|
|
|
- */
|
|
|
- private Map<String, Object> param;
|
|
|
-
|
|
|
- /**
|
|
|
- * 消费线程class
|
|
|
- */
|
|
|
- private Class<? extends Consumer<?>> consumer;
|
|
|
-
|
|
|
- public List<Consumer<?>> consumers=new ArrayList<>();
|
|
|
- /**
|
|
|
- * 处理开始方法
|
|
|
- * @param consumer 消费线程class
|
|
|
- * @param consumerCount 消费线程数
|
|
|
- * @param param 生产者业务参数
|
|
|
- * @throws InstantiationException
|
|
|
- * @throws IllegalAccessException
|
|
|
- */
|
|
|
- public void startDispose(Class<? extends Consumer<?>> consumer, int consumerCount,Map<String, Object> param)
|
|
|
- throws InstantiationException, IllegalAccessException {
|
|
|
- Basket basket = new Basket(consumerCount);
|
|
|
- this.basket = basket;
|
|
|
- this.consumer = consumer;
|
|
|
- this.param=param;
|
|
|
- //启动消费者
|
|
|
- startConsumer();
|
|
|
- //开始处理
|
|
|
- dispose();
|
|
|
- }
|
|
|
-
|
|
|
- private void dispose() {
|
|
|
- try {
|
|
|
- logger.info("*******************Producer:开始处理");
|
|
|
- // 生产数据
|
|
|
- produce(param);
|
|
|
- logger.info("*******************Producer:生产结束");
|
|
|
- // 发送生产结束信息
|
|
|
- endConsumer();
|
|
|
- logger.info("*******************Producer:成功发送生产结束信息");
|
|
|
- // 等待子线程结束
|
|
|
- logger.info("*******************Producer:等待消费线程结束");
|
|
|
- await();
|
|
|
- logger.info("*******************Producer:消费线程已结束");
|
|
|
- // 判断子线程是否正常结束
|
|
|
- if (basket.isExcuteError()) {
|
|
|
- throw new StatusException("1000001", "处理失败,线程异常");
|
|
|
- }
|
|
|
- logger.info("*******************Producer:结束处理");
|
|
|
- } catch (StatusException e) {
|
|
|
- // 获取异常时发送异常结束信息
|
|
|
- endConsumerAsError();
|
|
|
- throw e;
|
|
|
- } catch (Exception e) {
|
|
|
- // 获取异常时发送异常结束信息
|
|
|
- endConsumerAsError();
|
|
|
- throw new StatusException("1000002", "处理失败", e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 启动消费者
|
|
|
- *
|
|
|
- * @param consumer
|
|
|
- * @throws InstantiationException
|
|
|
- * @throws IllegalAccessException
|
|
|
- */
|
|
|
- private void startConsumer() throws InstantiationException, IllegalAccessException {
|
|
|
- int count = basket.getConsumerCount();
|
|
|
- for (int i = 0; i < count; i++) {
|
|
|
- Consumer<?> co = (Consumer<?>) consumer.newInstance();
|
|
|
- co.setBasket(basket);
|
|
|
- co.setTraceId(ThreadContext.get("TRACE_ID"));
|
|
|
- consumers.add(co);
|
|
|
- co.start();
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 出异常后修改标识
|
|
|
- *
|
|
|
- */
|
|
|
- private void endConsumerAsError() {
|
|
|
- basket.setExcuteError(true);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 正常结束消费者
|
|
|
- *
|
|
|
- * @throws InterruptedException
|
|
|
- */
|
|
|
- private void endConsumer() throws InterruptedException {
|
|
|
- int count = basket.getConsumerCount();
|
|
|
- EndObject eo = new EndObject();
|
|
|
- for (int i = 0; i < count; i++) {
|
|
|
- basket.offer(eo);
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 生产数据
|
|
|
- *
|
|
|
- * @param ob
|
|
|
- * @throws InterruptedException
|
|
|
- */
|
|
|
- protected void offer(Object ob) throws InterruptedException {
|
|
|
- synchronized (basket) {
|
|
|
- basket.offer(ob);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 等待所有消费者结束
|
|
|
- *
|
|
|
- * @throws InterruptedException
|
|
|
- */
|
|
|
- private void await() throws InterruptedException {
|
|
|
- basket.await();
|
|
|
- }
|
|
|
-
|
|
|
- protected abstract void produce(Map<String, Object> param) throws Exception;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import cn.com.qmth.im.StatusException;
|
|
|
+
|
|
|
+public abstract class Producer<T, C extends Consumer<T>> {
|
|
|
+
|
|
|
+ private static final Logger LOG = LoggerFactory.getLogger(Producer.class);
|
|
|
+
|
|
|
+ private Basket basket;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 消费线程class
|
|
|
+ */
|
|
|
+ private List<C> consumers;
|
|
|
+
|
|
|
+ public void startDispose(int consumerCount) {
|
|
|
+ startDispose(consumerCount, null, 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理开始方法
|
|
|
+ */
|
|
|
+ public void startDispose(int consumerCount, Map<String, Object> param) {
|
|
|
+ startDispose(consumerCount, param, 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void startDispose(int consumerCount, int total) {
|
|
|
+ startDispose(consumerCount, null, total);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void startDispose(int consumerCount, Map<String, Object> param, int total) {
|
|
|
+ // 启动消费者
|
|
|
+ startConsumer(consumerCount, total, null);
|
|
|
+ // 开始处理
|
|
|
+ dispose(param);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void startDispose(int consumerCount, Map<String, Object> param, int total, ProcessCount pc) {
|
|
|
+ // 启动消费者
|
|
|
+ startConsumer(consumerCount, total, pc);
|
|
|
+ // 开始处理
|
|
|
+ dispose(param);
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private void startConsumer(int consumerCount, int total, ProcessCount pc) {
|
|
|
+ if (consumerCount <= 0) {
|
|
|
+ consumerCount = 1;
|
|
|
+ }
|
|
|
+ ParameterizedType pt = (ParameterizedType) this.getClass().getGenericSuperclass();
|
|
|
+ Class<C> clazz = (Class<C>) pt.getActualTypeArguments()[1];
|
|
|
+ consumers = new ArrayList<>();
|
|
|
+ this.basket = new Basket(consumerCount, getTaskName());
|
|
|
+ basket.setTotal(total);
|
|
|
+ if (pc == null) {
|
|
|
+ basket.setProcess(new ProcessCount());
|
|
|
+ } else {
|
|
|
+ basket.setProcess(pc);
|
|
|
+ }
|
|
|
+ // 启动消费者
|
|
|
+ int count = basket.getConsumerCount();
|
|
|
+ try {
|
|
|
+ for (int i = 0; i < count; i++) {
|
|
|
+ C co = (C) clazz.newInstance();
|
|
|
+ co.setBasket(basket);
|
|
|
+ co.start();
|
|
|
+ consumers.add(co);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void dispose(Map<String, Object> param) {
|
|
|
+ try {
|
|
|
+ // 生产数据
|
|
|
+ produce(param);
|
|
|
+ // 发送生产结束信息
|
|
|
+ endConsumer();
|
|
|
+ // 等待子线程结束
|
|
|
+ await();
|
|
|
+ // 判断子线程是否正常结束
|
|
|
+ if (basket.isExcuteError()) {
|
|
|
+ throw new StatusException("处理失败,线程异常");
|
|
|
+ }
|
|
|
+ } catch (StatusException e) {
|
|
|
+ LOG.error(e.getMessage(), e);
|
|
|
+ // 获取异常时发送异常结束信息
|
|
|
+ endConsumerAsError();
|
|
|
+ throw e;
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error(e.getMessage(), e);
|
|
|
+ // 获取异常时发送异常结束信息
|
|
|
+ endConsumerAsError();
|
|
|
+ throw new StatusException("处理失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected abstract void produce(Map<String, Object> param) throws Exception;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生产数据
|
|
|
+ *
|
|
|
+ * @param ob
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ protected void offer(T ob) throws InterruptedException {
|
|
|
+ synchronized (basket) {
|
|
|
+ basket.offer(ob);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 出异常后修改标识
|
|
|
+ *
|
|
|
+ */
|
|
|
+ private void endConsumerAsError() {
|
|
|
+ basket.setExcuteError(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 正常结束消费者
|
|
|
+ *
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ private void endConsumer() throws InterruptedException {
|
|
|
+ int count = basket.getConsumerCount();
|
|
|
+ EndObject eo = new EndObject();
|
|
|
+ for (int i = 0; i < count; i++) {
|
|
|
+ basket.offer(eo);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 等待所有消费者结束
|
|
|
+ *
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ private void await() throws InterruptedException {
|
|
|
+ basket.await();
|
|
|
+ }
|
|
|
+
|
|
|
+ protected abstract String getTaskName();
|
|
|
+
|
|
|
+ public List<String> getMsgs() {
|
|
|
+ return this.basket.getMsgs();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setTotal(int total) {
|
|
|
+ this.basket.setTotal(total);
|
|
|
+ }
|
|
|
+
|
|
|
+ public Integer getTotal() {
|
|
|
+ return this.basket.getTotal();
|
|
|
+ }
|
|
|
+
|
|
|
+ public AtomicInteger getProcess() {
|
|
|
+ return this.basket.getProcess();
|
|
|
+ }
|
|
|
+
|
|
|
+ public List<C> getConsumers() {
|
|
|
+ return consumers;
|
|
|
+ }
|
|
|
}
|