Browse Source

报表计算任务

xiatian 6 years ago
parent
commit
96ceca14aa

+ 103 - 0
examcloud-task-base/src/main/java/cn/com/qmth/examcloud/task/base/multithread/Basket.java

@@ -0,0 +1,103 @@
+package cn.com.qmth.examcloud.task.base.multithread;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import cn.com.qmth.examcloud.commons.exception.StatusException;
+
+public  class  Basket {
+	private static final Logger logger = LoggerFactory.getLogger(Basket.class);
+	/**
+	 * 数据阻塞队列
+	 */
+	private BlockingQueue<Object> queue;
+	
+	/**
+	 * 多线程计数器,子线程都结束后主线程才继续执行
+	 */
+	private CountDownLatch endGate;
+	
+	/**
+	 * 消费者数量
+	 */
+	private int consumerCount;
+	
+	
+	/**
+	 * 判断线程执行是否有出错,生产者、消费者出错都需要修改此值为true
+	 */
+	private boolean isExcuteError = false;
+	
+	
+	public Basket(int consumerCount) {
+		this.consumerCount=consumerCount;
+		queue = new ArrayBlockingQueue<Object>(consumerCount*2);
+		endGate = new CountDownLatch(consumerCount);
+	}
+
+	/**
+	 * 生产数据,不采用put方法防止消费线程全部异常后生产线程阻塞
+	 * @param value
+	 * @throws InterruptedException
+	 */
+	protected void offer(final Object value) throws InterruptedException {
+		if(isExcuteError) {
+			logger.error("**********************offer isExcuteError threadId:"+Thread.currentThread().getId());
+			throw new StatusException("1000001","线程异常");
+		}else {
+			boolean ret=queue.offer(value, 1, TimeUnit.MINUTES);
+			if(!ret) {
+				logger.info("**********************offer time out threadId:"+Thread.currentThread().getId()+value);
+				this.offer(value);
+			}
+		}
+	}
+	/**
+	 * 消费数据,不采用take方法防止生产线程全部异常后消费线程阻塞
+	 * @return
+	 * @throws InterruptedException
+	 */
+	protected Object consume() throws InterruptedException {
+		if(isExcuteError) {
+			logger.error("**********************poll isExcuteError  threadId:"+Thread.currentThread().getId());
+			return new EndObject();
+		}else {
+			Object ob=queue.poll(1, TimeUnit.MINUTES);
+			if(ob==null) {
+				logger.info("**********************poll time out  threadId:"+Thread.currentThread().getId());
+				return this.consume();
+			}else {
+				return ob;
+			}
+		}
+	}
+	
+	protected void await() throws InterruptedException {
+		endGate.await();
+	}
+	protected void countDown() {
+		endGate.countDown();
+	}
+
+	protected boolean isExcuteError() {
+		return isExcuteError;
+	}
+
+	protected void setExcuteError(boolean isExcuteError) {
+		this.isExcuteError = isExcuteError;
+	}
+
+	protected int getConsumerCount() {
+		return consumerCount;
+	}
+
+	protected void setConsumerCount(int consumerCount) {
+		this.consumerCount = consumerCount;
+	}
+
+}

+ 60 - 0
examcloud-task-base/src/main/java/cn/com/qmth/examcloud/task/base/multithread/Consumer.java

@@ -0,0 +1,60 @@
+package cn.com.qmth.examcloud.task.base.multithread;
+
+import org.apache.logging.log4j.ThreadContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class Consumer<T>  extends Thread{
+	private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
+	private Basket basket;
+	
+	private String traceId;
+	
+	public Consumer() {
+	}
+	@Override
+	public void run() {
+		logger.info("*******************Consumer:"+Thread.currentThread().getId()+" start");
+		ThreadContext.put("TRACE_ID", traceId);
+		try {
+			while (true) {
+				//先判断是否有异常结束
+				if(basket.isExcuteError()) {
+					break;
+				}
+				//取消费数据
+				Object o= basket.consume();
+				//判断消费数据是否是结束
+				if(o instanceof EndObject) {
+					break;
+				}
+				@SuppressWarnings("unchecked")
+				T t=(T)o;
+				logger.info("*******************Consumer:"+Thread.currentThread().getId()+" consume");
+				//消费数据实现
+				consume(t);
+			}
+		} catch (Exception e) {
+			basket.setExcuteError(true);
+			logger.info("消费线程处理出错",e);
+		}finally {
+			basket.countDown();
+			logger.info("*******************Consumer:"+Thread.currentThread().getId()+" stop");
+			ThreadContext.clearAll();
+		}
+	}
+	public abstract void consume(T t);
+	public Basket getBasket() {
+		return basket;
+	}
+	public void setBasket(Basket basket) {
+		this.basket = basket;
+	}
+	public String getTraceId() {
+		return traceId;
+	}
+	public void setTraceId(String traceId) {
+		this.traceId = traceId;
+	}
+	
+}

+ 10 - 0
examcloud-task-base/src/main/java/cn/com/qmth/examcloud/task/base/multithread/EndObject.java

@@ -0,0 +1,10 @@
+package cn.com.qmth.examcloud.task.base.multithread;
+
+/**
+ * 消费结束标识对象
+ * @author xiatian
+ *
+ */
+public class EndObject {
+
+}

+ 136 - 0
examcloud-task-base/src/main/java/cn/com/qmth/examcloud/task/base/multithread/Producer.java

@@ -0,0 +1,136 @@
+package cn.com.qmth.examcloud.task.base.multithread;
+
+import java.util.Map;
+
+import org.apache.logging.log4j.ThreadContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import cn.com.qmth.examcloud.commons.exception.StatusException;
+
+public abstract class Producer {
+	private static final Logger logger = LoggerFactory.getLogger(Producer.class);
+	private Basket basket;
+	
+	/**
+	 * 业务参数
+	 */
+	private Map<String, Object> param;
+	
+	/**
+	 * 消费线程class
+	 */
+	private Class<? extends Consumer<?>> consumer;
+	/**
+	 * 	处理开始方法
+	 * @param consumer 消费线程class
+	 * @param consumerCount 消费线程数
+	 * @param param 生产者业务参数
+	 * @throws InstantiationException
+	 * @throws IllegalAccessException
+	 */
+	public void startDispose(Class<? extends Consumer<?>> consumer, int consumerCount,Map<String, Object> param)
+			throws InstantiationException, IllegalAccessException {
+		Basket basket = new Basket(consumerCount);
+		this.basket = basket;
+		this.consumer = consumer;
+		this.param=param;
+		//启动消费者
+		startConsumer();
+		//开始处理
+		dispose();
+	}
+
+	private void dispose() {
+		try {
+			logger.info("*******************Producer:开始处理");
+			// 生产数据
+			produce(param);
+			logger.info("*******************Producer:生产结束");
+			// 发送生产结束信息
+			endConsumer();
+			logger.info("*******************Producer:成功发送生产结束信息");
+			// 等待子线程结束
+			logger.info("*******************Producer:等待消费线程结束");
+			await();
+			logger.info("*******************Producer:消费线程已结束");
+			// 判断子线程是否正常结束
+			if (basket.isExcuteError()) {
+				throw new StatusException("1000001", "处理失败,线程异常");
+			}
+			logger.info("*******************Producer:结束处理");
+		} catch (StatusException e) {
+			// 获取异常时发送异常结束信息
+			endConsumerAsError();
+			throw e;
+		} catch (Exception e) {
+			// 获取异常时发送异常结束信息
+			endConsumerAsError();
+			throw new StatusException("1000002", "处理失败", e);
+		}
+	}
+
+	/**
+	 * 启动消费者
+	 * 
+	 * @param consumer
+	 * @throws InstantiationException
+	 * @throws IllegalAccessException
+	 */
+	private void startConsumer() throws InstantiationException, IllegalAccessException {
+		int count = basket.getConsumerCount();
+		for (int i = 0; i < count; i++) {
+			Consumer<?> co = (Consumer<?>) consumer.newInstance();
+			co.setBasket(basket);
+			co.setTraceId(ThreadContext.get("TRACE_ID"));
+			co.start();
+		}
+
+	}
+
+	/**
+	 * 出异常后修改标识
+	 * 
+	 */
+	private void endConsumerAsError() {
+		basket.setExcuteError(true);
+	}
+
+	/**
+	 * 正常结束消费者
+	 * 
+	 * @throws InterruptedException
+	 */
+	private void endConsumer() throws InterruptedException {
+		int count = basket.getConsumerCount();
+		EndObject eo = new EndObject();
+		for (int i = 0; i < count; i++) {
+			basket.offer(eo);
+		}
+
+	}
+
+	/**
+	 * 生产数据
+	 * 
+	 * @param ob
+	 * @throws InterruptedException
+	 */
+	protected void offer(Object ob) throws InterruptedException {
+		synchronized (basket) {
+			basket.offer(ob);
+		}
+	}
+
+	/**
+	 * 等待所有消费者结束
+	 * 
+	 * @throws InterruptedException
+	 */
+	private void await() throws InterruptedException {
+		basket.await();
+	}
+
+	protected abstract void produce(Map<String, Object> param) throws Exception;
+
+}

+ 20 - 0
examcloud-task-dao/src/main/java/cn/com/qmth/examcloud/task/dao/ReportsComputeRepo.java

@@ -0,0 +1,20 @@
+package cn.com.qmth.examcloud.task.dao;
+
+import java.util.List;
+
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.stereotype.Repository;
+
+import cn.com.qmth.examcloud.task.dao.entity.ReportsComputeEntity;
+
+@Repository
+public interface ReportsComputeRepo
+		extends
+			JpaRepository<ReportsComputeEntity, Long>,
+			JpaSpecificationExecutor<ReportsComputeEntity> {
+	@Query(value = "SELECT t.* FROM EC_T_REPORTS_COMPUTE t "+
+			"WHERE t.id>?1 and t.status ='NONE' ORDER BY t.id limit ?2", nativeQuery = true)
+	public List<ReportsComputeEntity> findTodoData(Long startId,Integer limit);
+}

+ 99 - 0
examcloud-task-dao/src/main/java/cn/com/qmth/examcloud/task/dao/entity/ReportsComputeEntity.java

@@ -0,0 +1,99 @@
+package cn.com.qmth.examcloud.task.dao.entity;
+
+import java.util.Date;
+
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Index;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+import javax.validation.constraints.NotNull;
+
+import cn.com.qmth.examcloud.task.dao.enums.ReportsComputeStatus;
+import cn.com.qmth.examcloud.web.jpa.JpaEntity;
+
+@Entity
+@Table(name = "EC_T_REPORTS_COMPUTE", indexes = {
+		@Index(name = "IDX_T_REPORTS_COMPUTE_01", columnList = "projectId", unique = false)})
+public class ReportsComputeEntity extends JpaEntity {
+
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = -1847946836121160945L;
+
+	@Id
+	@GeneratedValue(strategy = GenerationType.IDENTITY)
+	private Long id;
+	
+	@NotNull
+	private Long projectId;
+	
+	@NotNull
+	@Enumerated(EnumType.STRING)
+	private ReportsComputeStatus status;
+	
+	@Temporal(TemporalType.TIMESTAMP)
+	private Date startTime;
+	
+	@Temporal(TemporalType.TIMESTAMP)
+	private Date endTime;
+
+	private String errorDesc;
+
+	public Long getId() {
+		return id;
+	}
+
+	public void setId(Long id) {
+		this.id = id;
+	}
+
+	public Long getProjectId() {
+		return projectId;
+	}
+
+	public void setProjectId(Long projectId) {
+		this.projectId = projectId;
+	}
+
+	public ReportsComputeStatus getStatus() {
+		return status;
+	}
+
+	public void setStatus(ReportsComputeStatus status) {
+		this.status = status;
+	}
+
+	public Date getStartTime() {
+		return startTime;
+	}
+
+	public void setStartTime(Date startTime) {
+		this.startTime = startTime;
+	}
+
+	public Date getEndTime() {
+		return endTime;
+	}
+
+	public void setEndTime(Date endTime) {
+		this.endTime = endTime;
+	}
+
+	public String getErrorDesc() {
+		return errorDesc;
+	}
+
+	public void setErrorDesc(String errorDesc) {
+		this.errorDesc = errorDesc;
+	}
+	
+	
+}

+ 27 - 0
examcloud-task-dao/src/main/java/cn/com/qmth/examcloud/task/dao/enums/ReportsComputeStatus.java

@@ -0,0 +1,27 @@
+package cn.com.qmth.examcloud.task.dao.enums;
+public enum ReportsComputeStatus {
+
+	NONE("待处理"), COMPUTING("处理中"), SUCCESS("处理成功"), FAIL(
+			"处理失败"), STOPING("待终止"), STOP("已终止");
+
+	// ===========================================================================
+
+	/**
+	 * 描述
+	 */
+	private String desc;
+
+	/**
+	 * 构造函数
+	 *
+	 * @param desc
+	 */
+	private ReportsComputeStatus(String desc) {
+		this.desc = desc;
+	}
+
+	public String getDesc() {
+		return desc;
+	}
+
+}

+ 52 - 47
examcloud-task-service/pom.xml

@@ -1,48 +1,53 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<groupId>cn.com.qmth.examcloud.task</groupId>
-		<artifactId>examcloud-task</artifactId>
-		<version>2019-SNAPSHOT</version>
-	</parent>
-	<artifactId>examcloud-task-service</artifactId>
-
-	<dependencies>
-		<dependency>
-			<groupId>cn.com.qmth.examcloud.task</groupId>
-			<artifactId>examcloud-task-dao</artifactId>
-			<version>${examcloud.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>cn.com.qmth.examcloud.rpc</groupId>
-			<artifactId>examcloud-core-basic-api-client</artifactId>
-			<version>${examcloud.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>cn.com.qmth.examcloud.rpc</groupId>
-			<artifactId>examcloud-core-examwork-api-client</artifactId>
-			<version>${examcloud.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>cn.com.qmth.examcloud.rpc</groupId>
-			<artifactId>examcloud-global-api-client</artifactId>
-			<version>${examcloud.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>cn.com.qmth.examcloud.rpc</groupId>
-			<artifactId>examcloud-core-oe-student-api-client</artifactId>
-			<version>${examcloud.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>cn.com.qmth.examcloud.rpc</groupId>
-			<artifactId>examcloud-core-oe-admin-api-client</artifactId>
-			<version>${examcloud.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>cn.com.qmth.examcloud.rpc</groupId>
-			<artifactId>examcloud-core-oe-student-face-api-client</artifactId>
-			<version>${examcloud.version}</version>
-		</dependency>
-	</dependencies>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>cn.com.qmth.examcloud.task</groupId>
+		<artifactId>examcloud-task</artifactId>
+		<version>2019-SNAPSHOT</version>
+	</parent>
+	<artifactId>examcloud-task-service</artifactId>
+
+	<dependencies>
+		<dependency>
+			<groupId>cn.com.qmth.examcloud.task</groupId>
+			<artifactId>examcloud-task-dao</artifactId>
+			<version>${examcloud.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>cn.com.qmth.examcloud.rpc</groupId>
+			<artifactId>examcloud-core-basic-api-client</artifactId>
+			<version>${examcloud.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>cn.com.qmth.examcloud.rpc</groupId>
+			<artifactId>examcloud-core-examwork-api-client</artifactId>
+			<version>${examcloud.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>cn.com.qmth.examcloud.rpc</groupId>
+			<artifactId>examcloud-global-api-client</artifactId>
+			<version>${examcloud.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>cn.com.qmth.examcloud.rpc</groupId>
+			<artifactId>examcloud-core-oe-student-api-client</artifactId>
+			<version>${examcloud.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>cn.com.qmth.examcloud.rpc</groupId>
+			<artifactId>examcloud-core-oe-admin-api-client</artifactId>
+			<version>${examcloud.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>cn.com.qmth.examcloud.rpc</groupId>
+			<artifactId>examcloud-core-oe-student-face-api-client</artifactId>
+			<version>${examcloud.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>cn.com.qmth.examcloud.rpc</groupId>
+			<artifactId>examcloud-core-reports-api-client</artifactId>
+			<version>${examcloud.version}</version>
+		</dependency>
+	</dependencies>
 </project>

+ 17 - 0
examcloud-task-service/src/main/java/cn/com/qmth/examcloud/task/service/ReportsComputeService.java

@@ -0,0 +1,17 @@
+package cn.com.qmth.examcloud.task.service;
+
+import java.util.List;
+
+import cn.com.qmth.examcloud.task.dao.entity.ReportsComputeEntity;
+
+public interface ReportsComputeService {
+	
+	public List<ReportsComputeEntity> findTodoData(Long startId,Integer limit);
+	public void update(ReportsComputeEntity et);
+	public void compute(ReportsComputeEntity et);
+	public void updateToComputing(ReportsComputeEntity et);
+	public void updateToSuccess(ReportsComputeEntity et);
+	public void updateToFail(ReportsComputeEntity et);
+	public void updateToStoping(ReportsComputeEntity et);
+	public void updateToStop(ReportsComputeEntity et);
+}

+ 31 - 0
examcloud-task-service/src/main/java/cn/com/qmth/examcloud/task/service/consumer/ReportsComputeConsumer.java

@@ -0,0 +1,31 @@
+package cn.com.qmth.examcloud.task.service.consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import cn.com.qmth.examcloud.task.base.multithread.Consumer;
+import cn.com.qmth.examcloud.task.dao.entity.ReportsComputeEntity;
+import cn.com.qmth.examcloud.task.service.ReportsComputeService;
+import cn.com.qmth.examcloud.web.support.SpringContextHolder;
+
+public class ReportsComputeConsumer extends Consumer<ReportsComputeEntity> {
+
+	private static final Logger logger = LoggerFactory.getLogger(ReportsComputeConsumer.class);
+	private ReportsComputeService reportsComputeService = SpringContextHolder.getBean(ReportsComputeService.class);
+
+	@Override
+	public void consume(ReportsComputeEntity et) {
+		logger.info("***************************报表计算开始,projectId:"+et.getProjectId());
+		// 修改报表计算任务状态
+		reportsComputeService.updateToComputing(et);
+		try {
+			//计算报表
+			reportsComputeService.compute(et);
+		} catch (Exception e) {
+			//计算出错
+			reportsComputeService.updateToFail(et);
+			logger.info("***************************报表计算出错,projectId:"+et.getProjectId());
+		}
+		logger.info("***************************报表计算结束,projectId:"+et.getProjectId());
+	}
+}

+ 98 - 0
examcloud-task-service/src/main/java/cn/com/qmth/examcloud/task/service/impl/ReportsComputeServiceImpl.java

@@ -0,0 +1,98 @@
+package cn.com.qmth.examcloud.task.service.impl;
+
+import java.util.Date;
+import java.util.List;
+
+import javax.transaction.Transactional;
+
+import org.examcloud.core.reports.api.ProjectCloudService;
+import org.examcloud.core.reports.api.request.UpdateProjectStatusReq;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import cn.com.qmth.examcloud.task.dao.ReportsComputeRepo;
+import cn.com.qmth.examcloud.task.dao.entity.ReportsComputeEntity;
+import cn.com.qmth.examcloud.task.dao.enums.ReportsComputeStatus;
+import cn.com.qmth.examcloud.task.service.ReportsComputeService;
+
+@Service
+public class ReportsComputeServiceImpl implements ReportsComputeService {
+
+	@Autowired
+	private ReportsComputeRepo reportsComputeRepo;
+	
+	@Autowired
+	private ProjectCloudService projectCloudService;
+
+	@Override
+	public List<ReportsComputeEntity> findTodoData(Long startId, Integer limit) {
+		return reportsComputeRepo.findTodoData(startId, limit);
+	}
+
+	@Transactional
+	@Override
+	public void update(ReportsComputeEntity et) {
+		reportsComputeRepo.save(et);
+	}
+
+	@Transactional
+	@Override
+	public void compute(ReportsComputeEntity et) {
+		// TODO Auto-generated method stub
+		
+	}
+	@Transactional
+	@Override
+	public void updateToComputing(ReportsComputeEntity et) {
+		et.setStatus(ReportsComputeStatus.COMPUTING);
+		et.setStartTime(new Date());
+		reportsComputeRepo.save(et);
+		UpdateProjectStatusReq req=new UpdateProjectStatusReq();
+		req.setProjectId(et.getId());
+		req.setStatus(2);
+		projectCloudService.updateProjectStatus(req);
+	}
+	@Transactional
+	@Override
+	public void updateToSuccess(ReportsComputeEntity et) {
+		et.setStatus(ReportsComputeStatus.SUCCESS);
+		et.setEndTime(new Date());
+		reportsComputeRepo.save(et);
+		UpdateProjectStatusReq req=new UpdateProjectStatusReq();
+		req.setProjectId(et.getId());
+		req.setStatus(3);
+		projectCloudService.updateProjectStatus(req);
+	}
+	@Transactional
+	@Override
+	public void updateToFail(ReportsComputeEntity et) {
+		et.setStatus(ReportsComputeStatus.FAIL);
+		et.setErrorDesc("系统错误");
+		et.setEndTime(new Date());
+		reportsComputeRepo.save(et);
+		UpdateProjectStatusReq req=new UpdateProjectStatusReq();
+		req.setProjectId(et.getId());
+		req.setStatus(4);
+		projectCloudService.updateProjectStatus(req);
+	}
+	@Transactional
+	@Override
+	public void updateToStoping(ReportsComputeEntity et) {
+		et.setStatus(ReportsComputeStatus.STOPING);
+		reportsComputeRepo.save(et);
+	}
+	@Transactional
+	@Override
+	public void updateToStop(ReportsComputeEntity et) {
+		et.setStatus(ReportsComputeStatus.STOP);
+		et.setEndTime(new Date());
+		reportsComputeRepo.save(et);
+		UpdateProjectStatusReq req=new UpdateProjectStatusReq();
+		req.setProjectId(et.getId());
+		req.setStatus(5);
+		projectCloudService.updateProjectStatus(req);
+	}
+
+
+
+}

+ 32 - 0
examcloud-task-service/src/main/java/cn/com/qmth/examcloud/task/service/job/ReportsComputeTask.java

@@ -0,0 +1,32 @@
+package cn.com.qmth.examcloud.task.service.job;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import cn.com.qmth.examcloud.task.service.ReportsComputeService;
+import cn.com.qmth.examcloud.task.service.consumer.ReportsComputeConsumer;
+import cn.com.qmth.examcloud.task.service.producer.ReportsComputeProducer;
+import cn.com.qmth.examcloud.web.task.AbstractTask;
+import cn.com.qmth.examcloud.web.task.ScheduleJob;
+import cn.com.qmth.examcloud.web.task.TaskTracker;
+
+@Component("reportsComputeTask")
+public class ReportsComputeTask extends AbstractTask {
+
+	@Autowired
+	ReportsComputeService reportsComputeService;
+
+	@Autowired
+	TaskTracker TaskTracker;
+	
+	@Override
+	public void run(ScheduleJob scheduleJob) throws Exception {
+		ReportsComputeProducer pro=new ReportsComputeProducer();
+		pro.startDispose(ReportsComputeConsumer.class, 1, null);
+	}
+
+	@Override
+	public TaskTracker getTaskTracker() {
+		return TaskTracker;
+	}
+}

+ 42 - 0
examcloud-task-service/src/main/java/cn/com/qmth/examcloud/task/service/producer/ReportsComputeProducer.java

@@ -0,0 +1,42 @@
+package cn.com.qmth.examcloud.task.service.producer;
+
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import cn.com.qmth.examcloud.task.base.multithread.Producer;
+import cn.com.qmth.examcloud.task.dao.entity.ReportsComputeEntity;
+import cn.com.qmth.examcloud.task.service.ReportsComputeService;
+import cn.com.qmth.examcloud.web.support.SpringContextHolder;
+
+public class ReportsComputeProducer extends Producer {
+	private static final Logger logger = LoggerFactory.getLogger(ReportsComputeProducer.class);
+	private ReportsComputeService reportsComputeService = SpringContextHolder.getBean(ReportsComputeService.class);
+
+	@Override
+	protected void produce(Map<String, Object> param) throws Exception {
+		logger.info("***************************报表计算任务生产开始");
+		Long sartId = 0l;
+		Integer limit = 100;
+		for (;;) {
+			List<ReportsComputeEntity> list = reportsComputeService.findTodoData(sartId, limit);
+			int count=0;
+			if(list!=null) {
+				count=list.size();
+			}
+			logger.info("***************************startId:"+sartId+" count:"+count);
+			if (list != null && list.size() > 0) {
+				for(ReportsComputeEntity et:list) {
+					sartId=et.getId();
+					//生产数据
+					offer(et);
+				}
+			} else {
+				break;
+			}
+		}
+		logger.info("***************************报表计算任务生产结束");
+	}
+}