wangwei 6 жил өмнө
parent
commit
f2ea78b7bf

+ 59 - 0
src/main/java/cn/com/qmth/examcloud/web/task/AbstractTask.java

@@ -0,0 +1,59 @@
+package cn.com.qmth.examcloud.web.task;
+
+import cn.com.qmth.examcloud.commons.exception.ExamCloudRuntimeException;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLog;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLogFactory;
+import cn.com.qmth.examcloud.commons.util.JsonUtil;
+
+/**
+ * 任务调度抽象类
+ *
+ * @author WANGWEI
+ * @date 201837
+ * @Copyright (c) 2018-2020 WANGWEI [QQ:522080330] All Rights Reserved.
+ */
+public abstract class AbstractTask implements Task {
+
+	protected ExamCloudLog taskLog = ExamCloudLogFactory.getLog("TASK_LOGGER");
+
+	protected ExamCloudLog debugLog = ExamCloudLogFactory.getLog(this.getClass());
+
+	public abstract TaskTracker getTaskTracker();
+
+	public abstract void run(ScheduleJob scheduleJob) throws Exception;
+
+	@Override
+	public void execute(ScheduleJob scheduleJob, String traceId) {
+		try {
+			getTaskTracker().start(scheduleJob, traceId);
+		} catch (Exception e) {
+			taskLog.error("[TASK TRACKER]. start", e);
+		}
+		if (taskLog.isDebugEnabled()) {
+			taskLog.debug("[TASK IN]. detail: " + JsonUtil.toJson(scheduleJob));
+		}
+		try {
+			run(scheduleJob);
+		} catch (Exception e) {
+			try {
+				getTaskTracker().whenException(scheduleJob, traceId, e);
+			} catch (Exception ex) {
+				taskLog.error("[TASK TRACKER]. whenException", ex);
+			}
+			if (taskLog.isErrorEnabled()) {
+				taskLog.error("[TASK EXCEPTION]. detail: " + JsonUtil.toJson(scheduleJob), e);
+			}
+			throw new ExamCloudRuntimeException(e);
+		}
+		try {
+			getTaskTracker().onEnd(scheduleJob, traceId);
+		} catch (Exception e) {
+			taskLog.error("[TASK TRACKER]. onEnd", e);
+		}
+		if (taskLog.isDebugEnabled()) {
+			taskLog.debug("[TASK OUT]. detail: " + JsonUtil.toJson(scheduleJob));
+		}
+
+	}
+
+}

+ 52 - 0
src/main/java/cn/com/qmth/examcloud/web/task/DistributionJob.java

@@ -0,0 +1,52 @@
+package cn.com.qmth.examcloud.web.task;
+
+import org.apache.logging.log4j.ThreadContext;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLog;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLogFactory;
+import cn.com.qmth.examcloud.commons.util.JsonUtil;
+import cn.com.qmth.examcloud.commons.util.ThreadLocalUtil;
+import cn.com.qmth.examcloud.web.support.SpringContextHolder;
+
+/**
+ * 并行任务分发器
+ *
+ * @author WANGWEI
+ * @date 201837
+ * @Copyright (c) 2018-2020 WANGWEI [QQ:522080330] All Rights Reserved.
+ */
+public class DistributionJob implements Job {
+	private static final ExamCloudLog TASK_LOG = ExamCloudLogFactory.getLog("TASK_LOGGER");
+
+	@Override
+	public void execute(JobExecutionContext context) throws JobExecutionException {
+		String traceId = ThreadLocalUtil.next();
+		ThreadContext.put("TRACE_ID", traceId);
+		ScheduleJob scheduleJob = null;
+		try {
+			scheduleJob = (ScheduleJob) context.getMergedJobDataMap().get("scheduleJob");
+
+			if (TASK_LOG.isDebugEnabled()) {
+				TASK_LOG.debug("distribute job. job detail :" + JsonUtil.toJson(scheduleJob));
+			}
+			Object bean = SpringContextHolder.getBean(scheduleJob.getSpringBean());
+
+			Task task = (Task) bean;
+			task.execute(scheduleJob, traceId);
+		} catch (Exception e) {
+			if (TASK_LOG.isErrorEnabled()) {
+				TASK_LOG.error(
+						"fail to distribute job. job detail :" + JsonUtil.toJson(scheduleJob), e);
+			}
+			throw new JobExecutionException(e);
+		} finally {
+			// 清理log4j线程上下文
+			ThreadContext.clearAll();
+		}
+
+	}
+
+}

+ 262 - 0
src/main/java/cn/com/qmth/examcloud/web/task/QuartzManager.java

@@ -0,0 +1,262 @@
+package cn.com.qmth.examcloud.web.task;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.TriggerKey;
+import org.quartz.impl.matchers.GroupMatcher;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.quartz.SchedulerFactoryBean;
+import org.springframework.stereotype.Component;
+
+import cn.com.qmth.examcloud.commons.exception.ExamCloudRuntimeException;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLog;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLogFactory;
+import cn.com.qmth.examcloud.commons.util.JsonUtil;
+
+/**
+ * Quartz 管理器
+ *
+ * @author WANGWEI
+ * @date 201837
+ * @Copyright (c) 2018-2020 WANGWEI [QQ:522080330] All Rights Reserved.
+ */
+@Component
+public class QuartzManager {
+	private static final ExamCloudLog TASK_LOG = ExamCloudLogFactory.getLog("TASK_LOGGER");
+
+	@Autowired
+	private SchedulerFactoryBean schedulerFactoryBean;
+
+	/**
+	 * 添加任务
+	 *
+	 * @author WANGWEI
+	 * @param job
+	 */
+	public void addJob(ScheduleJob job) {
+		if (TASK_LOG.isDebugEnabled()) {
+			TASK_LOG.debug("add a job. job detail: " + JsonUtil.toJson(job));
+		}
+
+		try {
+			Scheduler scheduler = schedulerFactoryBean.getScheduler();
+			TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup());
+
+			CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
+
+			if (null == trigger) {
+				Class<? extends Job> jobClass = job.isStateful()
+						? StatefulDistributionJob.class
+						: DistributionJob.class;
+				JobDetail jobDetail = JobBuilder.newJob(jobClass)
+						.withIdentity(job.getJobName(), job.getJobGroup()).build();
+
+				jobDetail.getJobDataMap().put("scheduleJob", job);
+
+				CronScheduleBuilder scheduleBuilder = CronScheduleBuilder
+						.cronSchedule(job.getCronExpression());
+
+				trigger = TriggerBuilder.newTrigger()
+						.withIdentity(job.getJobName(), job.getJobGroup())
+						.withSchedule(scheduleBuilder).build();
+
+				scheduler.scheduleJob(jobDetail, trigger);
+			} else {
+				CronScheduleBuilder scheduleBuilder = CronScheduleBuilder
+						.cronSchedule(job.getCronExpression());
+
+				trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
+						.withSchedule(scheduleBuilder).build();
+
+				scheduler.rescheduleJob(triggerKey, trigger);
+			}
+		} catch (SchedulerException e) {
+			TASK_LOG.error("Fail to add a job. job detail: " + JsonUtil.toJson(job), e);
+			throw new ExamCloudRuntimeException(e);
+		}
+
+		if (TASK_LOG.isDebugEnabled()) {
+			TASK_LOG.debug("add a job successfully. job detail: " + JsonUtil.toJson(job));
+		}
+	}
+
+	/**
+	 * 获取所有计划中的任务列表
+	 *
+	 * @author WANGWEI
+	 * @return
+	 */
+	public List<ScheduleJob> getAllJobs() {
+		try {
+			Scheduler scheduler = schedulerFactoryBean.getScheduler();
+			GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
+			Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
+			List<ScheduleJob> jobList = new ArrayList<ScheduleJob>();
+			for (JobKey jobKey : jobKeys) {
+				List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
+				for (Trigger trigger : triggers) {
+					ScheduleJob job = new ScheduleJob();
+					job.setJobName(jobKey.getName());
+					job.setJobGroup(jobKey.getGroup());
+					Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
+					job.setTriggerState(triggerState.name());
+					if (trigger instanceof CronTrigger) {
+						CronTrigger cronTrigger = (CronTrigger) trigger;
+						String cronExpression = cronTrigger.getCronExpression();
+						job.setCronExpression(cronExpression);
+					}
+					jobList.add(job);
+				}
+			}
+			return jobList;
+		} catch (SchedulerException e) {
+			TASK_LOG.error("Fail to get all jobs.", e);
+			throw new ExamCloudRuntimeException(e);
+		}
+	}
+
+	/**
+	 * 所有正在运行的job
+	 *
+	 * @author WANGWEI
+	 * @return
+	 */
+	public List<ScheduleJob> getRunningJobs() {
+		try {
+			Scheduler scheduler = schedulerFactoryBean.getScheduler();
+			List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
+			List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(executingJobs.size());
+			for (JobExecutionContext context : executingJobs) {
+				ScheduleJob job = new ScheduleJob();
+				JobDetail jobDetail = context.getJobDetail();
+				JobKey jobKey = jobDetail.getKey();
+				Trigger trigger = context.getTrigger();
+				job.setJobName(jobKey.getName());
+				job.setJobGroup(jobKey.getGroup());
+				Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
+				job.setTriggerState(triggerState.name());
+				if (trigger instanceof CronTrigger) {
+					CronTrigger cronTrigger = (CronTrigger) trigger;
+					String cronExpression = cronTrigger.getCronExpression();
+					job.setCronExpression(cronExpression);
+				}
+				jobList.add(job);
+			}
+			return jobList;
+		} catch (Exception e) {
+			TASK_LOG.error("Fail to get running jobs.", e);
+			throw new ExamCloudRuntimeException(e);
+		}
+	}
+
+	/**
+	 * 暂停一个job
+	 *
+	 * @author WANGWEI
+	 * @param scheduleJob
+	 */
+	public void pauseJob(ScheduleJob scheduleJob) {
+		try {
+			Scheduler scheduler = schedulerFactoryBean.getScheduler();
+			JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
+			scheduler.pauseJob(jobKey);
+		} catch (SchedulerException e) {
+			TASK_LOG.error("Fail to pause Job. job detail: " + JsonUtil.toJson(scheduleJob), e);
+			throw new ExamCloudRuntimeException(e);
+		}
+	}
+
+	/**
+	 * 恢复一个job
+	 *
+	 * @author WANGWEI
+	 * @param scheduleJob
+	 */
+	public void resumeJob(ScheduleJob scheduleJob) {
+		try {
+			Scheduler scheduler = schedulerFactoryBean.getScheduler();
+			JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
+			scheduler.resumeJob(jobKey);
+		} catch (SchedulerException e) {
+			TASK_LOG.error("Fail to resume job. job detail: " + JsonUtil.toJson(scheduleJob), e);
+			throw new ExamCloudRuntimeException(e);
+		}
+	}
+
+	/**
+	 * 删除一个job
+	 *
+	 * @author WANGWEI
+	 * @param scheduleJob
+	 */
+	public void deleteJob(ScheduleJob scheduleJob) {
+		try {
+			Scheduler scheduler = schedulerFactoryBean.getScheduler();
+			JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
+			scheduler.deleteJob(jobKey);
+		} catch (SchedulerException e) {
+			TASK_LOG.error("Fail to delete job. job detail: " + JsonUtil.toJson(scheduleJob), e);
+			throw new ExamCloudRuntimeException(e);
+		}
+
+	}
+
+	/**
+	 * 执行job
+	 *
+	 * @author WANGWEI
+	 * @param scheduleJob
+	 */
+	public void runJob(ScheduleJob scheduleJob) {
+		try {
+			Scheduler scheduler = schedulerFactoryBean.getScheduler();
+			JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
+			scheduler.triggerJob(jobKey);
+		} catch (SchedulerException e) {
+			TASK_LOG.error("Fail to run job. job detail: " + JsonUtil.toJson(scheduleJob), e);
+			throw new ExamCloudRuntimeException(e);
+		}
+	}
+
+	/**
+	 * 更新job时间表达式
+	 *
+	 * @author WANGWEI
+	 * @param scheduleJob
+	 */
+	public void updateJobCronExpression(ScheduleJob scheduleJob) {
+		try {
+			Scheduler scheduler = schedulerFactoryBean.getScheduler();
+
+			TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(),
+					scheduleJob.getJobGroup());
+
+			CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
+
+			CronScheduleBuilder scheduleBuilder = CronScheduleBuilder
+					.cronSchedule(scheduleJob.getCronExpression());
+
+			trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
+					.withSchedule(scheduleBuilder).build();
+
+			scheduler.rescheduleJob(triggerKey, trigger);
+		} catch (SchedulerException e) {
+			TASK_LOG.error("Fail to update job cron expression. job detail :"
+					+ JsonUtil.toJson(scheduleJob), e);
+			throw new ExamCloudRuntimeException(e);
+		}
+	}
+}

+ 166 - 0
src/main/java/cn/com/qmth/examcloud/web/task/ScheduleJob.java

@@ -0,0 +1,166 @@
+package cn.com.qmth.examcloud.web.task;
+
+import java.io.Serializable;
+
+/**
+ * 任务配置
+ *
+ * @author WANGWEI
+ * @date 201837
+ * @Copyright (c) 2018-2020 WANGWEI [QQ:522080330] All Rights Reserved.
+ */
+public class ScheduleJob implements Serializable {
+	private static final long serialVersionUID = -1638993747996917970L;
+
+	private Long jobId;
+
+	/**
+	 * 任务名称
+	 */
+	private String jobName;
+
+	/**
+	 * 任务分组
+	 */
+	private String jobGroup;
+
+	/**
+	 * 触发器状态
+	 */
+	private String triggerState;
+
+	/**
+	 * cron表达式
+	 */
+	private String cronExpression;
+
+	/**
+	 * 描述
+	 */
+	private String description;
+
+	/**
+	 * Spring bean
+	 */
+	private String springBean;
+
+	/**
+	 * 是否顺序执行
+	 */
+	private boolean stateful = true;
+
+	private String ext1;
+
+	private String ext2;
+
+	private String ext3;
+
+	private String ext4;
+
+	private String ext5;
+
+	public Long getJobId() {
+		return jobId;
+	}
+
+	public void setJobId(Long jobId) {
+		this.jobId = jobId;
+	}
+
+	public String getJobName() {
+		return jobName;
+	}
+
+	public void setJobName(String jobName) {
+		this.jobName = jobName;
+	}
+
+	public String getJobGroup() {
+		return jobGroup;
+	}
+
+	public void setJobGroup(String jobGroup) {
+		this.jobGroup = jobGroup;
+	}
+
+	public String getTriggerState() {
+		return triggerState;
+	}
+
+	public void setTriggerState(String triggerState) {
+		this.triggerState = triggerState;
+	}
+
+	public String getCronExpression() {
+		return cronExpression;
+	}
+
+	public void setCronExpression(String cronExpression) {
+		this.cronExpression = cronExpression;
+	}
+
+	public String getDescription() {
+		return description;
+	}
+
+	public void setDescription(String description) {
+		this.description = description;
+	}
+
+	public String getSpringBean() {
+		return springBean;
+	}
+
+	public void setSpringBean(String springBean) {
+		this.springBean = springBean;
+	}
+
+	public boolean isStateful() {
+		return stateful;
+	}
+
+	public void setStateful(boolean stateful) {
+		this.stateful = stateful;
+	}
+
+	public String getExt1() {
+		return ext1;
+	}
+
+	public void setExt1(String ext1) {
+		this.ext1 = ext1;
+	}
+
+	public String getExt2() {
+		return ext2;
+	}
+
+	public void setExt2(String ext2) {
+		this.ext2 = ext2;
+	}
+
+	public String getExt3() {
+		return ext3;
+	}
+
+	public void setExt3(String ext3) {
+		this.ext3 = ext3;
+	}
+
+	public String getExt4() {
+		return ext4;
+	}
+
+	public void setExt4(String ext4) {
+		this.ext4 = ext4;
+	}
+
+	public String getExt5() {
+		return ext5;
+	}
+
+	public void setExt5(String ext5) {
+		this.ext5 = ext5;
+	}
+
+}

+ 18 - 0
src/main/java/cn/com/qmth/examcloud/web/task/StatefulDistributionJob.java

@@ -0,0 +1,18 @@
+package cn.com.qmth.examcloud.web.task;
+
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.PersistJobDataAfterExecution;
+
+/**
+ * 顺序执行任务分发器
+ *
+ * @author WANGWEI
+ * @date 201837
+ * @Copyright (c) 2018-2020 WANGWEI [QQ:522080330] All Rights Reserved.
+ */
+@DisallowConcurrentExecution
+@PersistJobDataAfterExecution
+public class StatefulDistributionJob extends DistributionJob
+{
+
+}

+ 20 - 0
src/main/java/cn/com/qmth/examcloud/web/task/Task.java

@@ -0,0 +1,20 @@
+package cn.com.qmth.examcloud.web.task;
+
+/**
+ * 任务接口
+ *
+ * @author WANGWEI
+ * @date 201837
+ * @Copyright (c) 2018-2020 WANGWEI [QQ:522080330] All Rights Reserved.
+ */
+public interface Task {
+
+	/**
+	 * 执行任务
+	 *
+	 * @author WANGWEI
+	 * @param scheduleJob
+	 * @param traceId
+	 */
+	void execute(ScheduleJob scheduleJob, String traceId);
+}

+ 40 - 0
src/main/java/cn/com/qmth/examcloud/web/task/TaskTracker.java

@@ -0,0 +1,40 @@
+package cn.com.qmth.examcloud.web.task;
+
+/**
+ * 任务跟踪记载器
+ *
+ * @author WANGWEI
+ * @date 2018725
+ * @Copyright (c) 2018-? http://qmth.com.cn All Rights Reserved.
+ */
+public interface TaskTracker {
+
+	/**
+	 * 启动
+	 *
+	 * @author WANGWEI
+	 * @param scheduleJob
+	 * @param traceId
+	 */
+	void start(final ScheduleJob scheduleJob, final String traceId);
+
+	/**
+	 * 异常
+	 *
+	 * @author WANGWEI
+	 * @param scheduleJob
+	 * @param traceId
+	 * @param e
+	 */
+	void whenException(final ScheduleJob scheduleJob, final String traceId, Exception e);
+
+	/**
+	 * 结束
+	 *
+	 * @author WANGWEI
+	 * @param scheduleJob
+	 * @param traceId
+	 */
+	void onEnd(final ScheduleJob scheduleJob, final String traceId);
+
+}