|
@@ -10,143 +10,152 @@ import org.apache.logging.log4j.ThreadContext;
|
|
|
|
|
|
public abstract class Producer {
|
|
|
|
|
|
- private static Logger LOG = LogManager.getLogger(MyProducer.class);
|
|
|
-
|
|
|
- private Basket basket;
|
|
|
-
|
|
|
- private List<Consumer<?>> consumers;
|
|
|
-
|
|
|
- /**
|
|
|
- * 业务参数
|
|
|
- */
|
|
|
- private Map<String, Object> param;
|
|
|
-
|
|
|
- /**
|
|
|
- * 消费线程class
|
|
|
- */
|
|
|
- private Class<? extends Consumer<?>> consumer;
|
|
|
- /**
|
|
|
- * 处理开始方法
|
|
|
- * @param consumer 消费线程class
|
|
|
- * @param consumerCount 消费线程数
|
|
|
- * @param param 生产者业务参数
|
|
|
- * @throws InstantiationException
|
|
|
- * @throws IllegalAccessException
|
|
|
- */
|
|
|
- public void startDispose(Class<? extends Consumer<?>> consumer, int consumerCount,Map<String, Object> param)
|
|
|
- throws InstantiationException, IllegalAccessException {
|
|
|
- this.consumers=new ArrayList<Consumer<?>>();
|
|
|
- Basket basket = new Basket(consumerCount);
|
|
|
- this.basket = basket;
|
|
|
- this.consumer = consumer;
|
|
|
- this.param=param;
|
|
|
- //启动消费者
|
|
|
- startConsumer();
|
|
|
- //开始处理
|
|
|
- dispose();
|
|
|
- }
|
|
|
-
|
|
|
- private void dispose() {
|
|
|
- try {
|
|
|
- LOG.info("*******************Producer:开始处理");
|
|
|
- // 生产数据
|
|
|
- produce(param);
|
|
|
- LOG.info("*******************Producer:生产结束");
|
|
|
- // 发送生产结束信息
|
|
|
- endConsumer();
|
|
|
- LOG.info("*******************Producer:成功发送生产结束信息");
|
|
|
- // 等待子线程结束
|
|
|
- LOG.info("*******************Producer:等待消费线程结束");
|
|
|
- await();
|
|
|
- LOG.info("*******************Producer:消费线程已结束");
|
|
|
- // 判断子线程是否正常结束
|
|
|
- if (basket.isExcuteError()) {
|
|
|
- throw new StatusException("1000001", "处理失败,线程异常");
|
|
|
- }
|
|
|
- LOG.info("*******************Producer:结束处理");
|
|
|
- } catch (StatusException e) {
|
|
|
- // 获取异常时发送异常结束信息
|
|
|
- endConsumerAsError();
|
|
|
- throw e;
|
|
|
- } catch (Exception e) {
|
|
|
- // 获取异常时发送异常结束信息
|
|
|
- endConsumerAsError();
|
|
|
- throw new StatusException("1000002", "处理失败", e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 启动消费者
|
|
|
- *
|
|
|
- * @param consumer
|
|
|
- * @throws InstantiationException
|
|
|
- * @throws IllegalAccessException
|
|
|
- */
|
|
|
- private void startConsumer() throws InstantiationException, IllegalAccessException {
|
|
|
- int count = basket.getConsumerCount();
|
|
|
- for (int i = 0; i < count; i++) {
|
|
|
- Consumer<?> co = (Consumer<?>) consumer.newInstance();
|
|
|
- co.initResult();
|
|
|
- co.setBasket(basket);
|
|
|
- co.setTraceId(ThreadContext.get("TRACE_ID"));
|
|
|
- consumers.add(co);
|
|
|
- co.start();
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 出异常后修改标识
|
|
|
- *
|
|
|
- */
|
|
|
- private void endConsumerAsError() {
|
|
|
- basket.setExcuteError(true);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 正常结束消费者
|
|
|
- *
|
|
|
- * @throws InterruptedException
|
|
|
- */
|
|
|
- private void endConsumer() throws InterruptedException {
|
|
|
- int count = basket.getConsumerCount();
|
|
|
- EndObject eo = new EndObject();
|
|
|
- for (int i = 0; i < count; i++) {
|
|
|
- basket.offer(eo);
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 生产数据
|
|
|
- *
|
|
|
- * @param ob
|
|
|
- * @throws InterruptedException
|
|
|
- */
|
|
|
- protected void offer(Object ob) throws InterruptedException {
|
|
|
- synchronized (basket) {
|
|
|
- basket.offer(ob);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 等待所有消费者结束
|
|
|
- *
|
|
|
- * @throws InterruptedException
|
|
|
- */
|
|
|
- private void await() throws InterruptedException {
|
|
|
- basket.await();
|
|
|
- }
|
|
|
-
|
|
|
- protected abstract void produce(Map<String, Object> param) throws Exception;
|
|
|
-
|
|
|
- public List<Consumer<?>> getConsumers() {
|
|
|
- return consumers;
|
|
|
- }
|
|
|
-
|
|
|
- public void setConsumers(List<Consumer<?>> consumers) {
|
|
|
- this.consumers = consumers;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
+ private static Logger LOG = LogManager.getLogger(MyProducer.class);
|
|
|
+
|
|
|
+ private Basket basket;
|
|
|
+
|
|
|
+ private List<Consumer<?>> consumers;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 业务参数
|
|
|
+ */
|
|
|
+ private Map<String, Object> param;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 消费线程class
|
|
|
+ */
|
|
|
+ private Class<? extends Consumer<?>> consumer;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理开始方法
|
|
|
+ *
|
|
|
+ * @param consumer
|
|
|
+ * 消费线程class
|
|
|
+ * @param consumerCount
|
|
|
+ * 消费线程数
|
|
|
+ * @param param
|
|
|
+ * 生产者业务参数
|
|
|
+ * @throws InstantiationException
|
|
|
+ * @throws IllegalAccessException
|
|
|
+ */
|
|
|
+ public void startDispose(Class<? extends Consumer<?>> consumer, int consumerCount, Map<String, Object> param)
|
|
|
+ throws InstantiationException, IllegalAccessException {
|
|
|
+ this.consumers = new ArrayList<Consumer<?>>();
|
|
|
+ Basket basket = new Basket(consumerCount);
|
|
|
+ this.basket = basket;
|
|
|
+ this.consumer = consumer;
|
|
|
+ this.param = param;
|
|
|
+ // 启动消费者
|
|
|
+ startConsumer();
|
|
|
+ // 开始处理
|
|
|
+ dispose();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void startDispose(Class<? extends Consumer<?>> consumer, int consumerCount)
|
|
|
+ throws InstantiationException, IllegalAccessException {
|
|
|
+ startDispose(consumer, consumerCount, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void dispose() {
|
|
|
+ try {
|
|
|
+ LOG.info("*******************Producer:开始处理");
|
|
|
+ // 生产数据
|
|
|
+ produce(param);
|
|
|
+ LOG.info("*******************Producer:生产结束");
|
|
|
+ // 发送生产结束信息
|
|
|
+ endConsumer();
|
|
|
+ LOG.info("*******************Producer:成功发送生产结束信息");
|
|
|
+ // 等待子线程结束
|
|
|
+ LOG.info("*******************Producer:等待消费线程结束");
|
|
|
+ await();
|
|
|
+ LOG.info("*******************Producer:消费线程已结束");
|
|
|
+ // 判断子线程是否正常结束
|
|
|
+ if (basket.isExcuteError()) {
|
|
|
+ throw new StatusException("1000001", "处理失败,线程异常");
|
|
|
+ }
|
|
|
+ LOG.info("*******************Producer:结束处理");
|
|
|
+ } catch (StatusException e) {
|
|
|
+ // 获取异常时发送异常结束信息
|
|
|
+ endConsumerAsError();
|
|
|
+ throw e;
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 获取异常时发送异常结束信息
|
|
|
+ endConsumerAsError();
|
|
|
+ throw new StatusException("1000002", "处理失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 启动消费者
|
|
|
+ *
|
|
|
+ * @param consumer
|
|
|
+ * @throws InstantiationException
|
|
|
+ * @throws IllegalAccessException
|
|
|
+ */
|
|
|
+ private void startConsumer() throws InstantiationException, IllegalAccessException {
|
|
|
+ int count = basket.getConsumerCount();
|
|
|
+ for (int i = 0; i < count; i++) {
|
|
|
+ Consumer<?> co = (Consumer<?>) consumer.newInstance();
|
|
|
+ co.initResult();
|
|
|
+ co.setBasket(basket);
|
|
|
+ co.setTraceId(ThreadContext.get("TRACE_ID"));
|
|
|
+ consumers.add(co);
|
|
|
+ co.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 出异常后修改标识
|
|
|
+ *
|
|
|
+ */
|
|
|
+ private void endConsumerAsError() {
|
|
|
+ basket.setExcuteError(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 正常结束消费者
|
|
|
+ *
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ private void endConsumer() throws InterruptedException {
|
|
|
+ int count = basket.getConsumerCount();
|
|
|
+ EndObject eo = new EndObject();
|
|
|
+ for (int i = 0; i < count; i++) {
|
|
|
+ basket.offer(eo);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生产数据
|
|
|
+ *
|
|
|
+ * @param ob
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ protected void offer(Object ob) throws InterruptedException {
|
|
|
+ synchronized (basket) {
|
|
|
+ basket.offer(ob);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 等待所有消费者结束
|
|
|
+ *
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ private void await() throws InterruptedException {
|
|
|
+ basket.await();
|
|
|
+ }
|
|
|
+
|
|
|
+ protected abstract void produce(Map<String, Object> param) throws Exception;
|
|
|
+
|
|
|
+ public List<Consumer<?>> getConsumers() {
|
|
|
+ return consumers;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setConsumers(List<Consumer<?>> consumers) {
|
|
|
+ this.consumers = consumers;
|
|
|
+ }
|
|
|
+
|
|
|
}
|