一、效果预览

二、流程梳理
除了数据库中 quartz 需要的 11 张表,我们同时自定义一张表来维护所有的定时任务信息,包括执行的类,执行的方法,并记录总共运行的次数和运行失败的次数(其实最好再加一张运行记录表)。

三、后端实现
前端代码暂不赘述,详见文末源码链接。如下是一些关键逻辑的代码:
① 自定义实体类:
@Data
@Entity
@Table(name="job_entity")
public class JobEntity {
public static final String STATUS_RUNNING = "1";
public static final String STATUS_NOT_RUNNING = "0";
public static final String CONCURRENT_DISALLOWED = "1";
public static final String CONCURRENT_ALLOWED = "0";
@Id
@GeneratedValue(strategy= GenerationType.IDENTITY)
private Long id;
/** 任务id */
//private String jobId;
/** 任务名称 */
private String jobName;
/** 任务分组,任务名称+组名称应该是唯一的 */
private String jobGroup;
/** 任务状态*/
private String jobStatus;
/** 任务data */
private String jobDataMapString;
@Transient
private Map<String, Object> jobDataMap;
/** 触发器名称*/
private String triggerName;
/** 触发器分组*/
private String triggerGroup;
/** 触发器时区*/
private String triggerTimeZone;
/** 任务是否取消并发 (默认是有并发的) */
private String isConcurrentDisallowed;
/** 任务运行时间表达式 */
private String cronExpression;
/** 任务描述 */
private String description;
/** 任务调用类在spring中注册的bean id,如果spingId不为空,则按springId查找 */
private String springId;
/** 任务调用类名,包名+类名,通过类反射调用 ,如果spingId为空,则按jobClass查找 */
private String jobClass;
/** 任务调用的方法名 */
private String methodName;
/** 启动时间 */
private Date startTime;
/** 前一次运行时间 */
private Date previousTime;
/** 下次运行时间 */
private Date nextTime;
/** job 运行次数(包括异常)*/
private int jobExecCount;
/** job 运行异常次数*/
private int jobExceptionCount;
@Column(name="create_dt")
private Timestamp createDate;
// 1:已删除,0:未删除
@Column(name="is_del")
private Boolean isDelete;
public JobEntity(){}
} ② JobService
业务层控制代码: @Service
@Log4j
public class JobService {
@Autowired
private SchedulerFactoryBean schedulerFactoryBean;
@Autowired
JobEntityRepository jobEntityRepository;
/**
* 获取所有任务
* @return
* @throws SchedulerException
*/
public PageResultForBootstrap<JobEntity> getAllJobs(PageRequest pageRequest, Map requestValue) throws SchedulerException{
Specification specification = (Specification<JobEntity>)(root, query, cb) -> {
List<Predicate> predicates = new ArrayList<>();
if (!org.springframework.util.StringUtils.isEmpty(requestValue.get("status"))) {
predicates.add(cb.equal(root.get("jobStatus").as(String.class), requestValue.get("status").toString()));
} else {
predicates.add(cb.notEqual(root.get("jobStatus").as(String.class), TaskStateEnum.COMPLETE.getIndex()));
}
predicates.add(cb.equal(root.get("isDelete").as(Boolean.class), Boolean.FALSE));
return cb.and(predicates.toArray(new Predicate[predicates.size()]));
};
Page<JobEntity> page = jobEntityRepository.findAll(specification, pageRequest);
PageResultForBootstrap<JobEntity> result = new PageResultForBootstrap<>(page);
page.getContent().forEach(val -> {
if (StringUtils.isNotBlank(val.getJobDataMapString())) {
val.setJobDataMap(JSON.parseObject(val.getJobDataMapString(), Map.class));
}
});
result.setRows(page.getContent());
return result;
}
/**
* 添加任务
*/
@Transactional
public boolean addJob(JobEntity job) throws SchedulerException {
if (job == null) {
return false;
}
if(!TaskUtils.isValidExpression(job.getCronExpression())){
log.error("时间表达式错误("+job.getJobName()+","+job.getJobGroup()+"),"+job.getCronExpression());
return false;
}else{
JobEntity job2Save;
Scheduler scheduler = schedulerFactoryBean.getScheduler();
// 任务名称和任务组设置规则: // 名称:task_1 .. // 组 :group_1 ..
TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 不存在,创建一个
if (null == trigger) {
//是否允许并发执行
Class<? extends Job> clazz = job.getIsConcurrentDisallowed() != null && JobEntity.CONCURRENT_DISALLOWED.equals(job.getIsConcurrentDisallowed()) ? QuartzJobFactoryDisallowConcurrentExecution.class : QuartzJobFactory.class;
JobBuilder jobBuilder = JobBuilder.newJob(clazz).withIdentity(job.getJobName(), job.getJobGroup());
if (StringUtils.isNotBlank(job.getDescription())) {
// 备注配置
jobBuilder.withDescription(job.getDescription());
}
JobDetail jobDetail = jobBuilder.build();
if (job.getJobDataMap() != null) {
// 参数配置
jobDetail.getJobDataMap().putAll(job.getJobDataMap());
}
// 表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
// 按新的表达式构建一个新的trigger
trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail, trigger);
job2Save = job;
if (job2Save.getJobDataMap() != null){
job2Save.setJobDataMapString(JSON.toJSONString(job2Save.getJobDataMap()));
}
} else {
// trigger已存在,则更新相应的定时设置
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
// 按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
// 按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
job2Save = jobEntityRepository.findByJobNameAndJobGroup(job.getJobName(), job.getJobGroup());
if (job.getJobDataMap() != null){
job2Save.setJobDataMapString(JSON.toJSONString(job.getJobDataMap()));
} else {
job2Save.setJobDataMapString(null);
}
job2Save.setSpringId(job.getSpringId());
job2Save.setJobClass(job.getJobClass());
job2Save.setMethodName(job.getMethodName());
job2Save.setDescription(job.getDescription());
job2Save.setCronExpression(job.getCronExpression());
}
// 自定义实体类存入数据库
if (job2Save.getJobStatus() == null) {
job2Save.setJobStatus(TaskStateEnum.NORMAL.getIndex());
}
job2Save.setIsDelete(false);
job2Save.setStartTime(trigger.getStartTime());
job2Save.setNextTime(trigger.getNextFireTime());
job2Save.setTriggerName(trigger.getKey().getName());
job2Save.setTriggerGroup(trigger.getKey().getGroup());
job2Save.setTriggerTimeZone(trigger.getTimeZone().getID());
jobEntityRepository.save(job2Save);
}
return true;
}
/**
* 暂停任务
* @param jobEntity
* @return
*/
@Transactional
public boolean pauseJob(JobEntity jobEntity){
Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(jobEntity.getJobName(), jobEntity.getJobGroup());
try {
JobEntity jobFind = jobEntityRepository.findByJobNameAndJobGroup(jobEntity.getJobName(), jobEntity.getJobGroup());
jobFind.setJobStatus(TaskStateEnum.PAUSED.getIndex());
jobEntityRepository.save(jobFind);
scheduler.pauseJob(jobKey);
return true;
} catch (SchedulerException e) {
log.error("暂停任务异常: " + jobEntity.toString(), e);
}
return false;
}
/**
* 恢复任务
* @param jobEntity
* @return
*/
@Transactional
public boolean resumeJob(JobEntity jobEntity){
Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(jobEntity.getJobName(), jobEntity.getJobGroup());
try {
JobEntity jobFind = jobEntityRepository.findByJobNameAndJobGroup(jobEntity.getJobName(), jobEntity.getJobGroup());
jobFind.setJobStatus(TaskStateEnum.NORMAL.getIndex());
jobEntityRepository.save(jobFind);
scheduler.resumeJob(jobKey);
return true;
} catch (SchedulerException e) {
log.error("恢复任务异常: " + jobEntity.toString(), e);
}
return false;
}
/**
* 删除任务
*/
@Transactional
public boolean deleteJob(JobEntity jobEntity){
Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(jobEntity.getJobName(), jobEntity.getJobGroup());
try{
JobEntity jobFind = jobEntityRepository.findByJobNameAndJobGroup(jobEntity.getJobName(), jobEntity.getJobGroup());
jobFind.setJobStatus(TaskStateEnum.DELETE.getIndex());
jobFind.setIsDelete(true);
jobEntityRepository.save(jobFind);
scheduler.deleteJob(jobKey);
return true;
} catch (SchedulerException e) {
log.error("删除任务异常: " + jobEntity.toString(), e);
}
return false;
}
/**
* 立即执行一个任务
* @param jobEntity
* @throws SchedulerException
*/
public void executeOnce(JobEntity jobEntity) throws SchedulerException{
Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(jobEntity.getJobName(), jobEntity.getJobGroup());
scheduler.triggerJob(jobKey);
}
} ③ 支持并发的 JobFactory
不支持并发的 Factory 仅仅在类上增加注解:@DisallowConcurrentExecution@Log4j
public class QuartzJobFactoryDisallowConcurrentExecution implements Job {
@Autowired
JobEntityRepository jobEntityRepository;
@Override
@Transactional
public void execute(JobExecutionContext context){
JobKey jobKey = context.getJobDetail().getKey();
JobEntity jobEntityFind = jobEntityRepository.findByJobNameAndJobGroup(jobKey.getName(), jobKey.getGroup());
log.info("运行任务名称 = [" + jobEntityFind.getJobName() + "],任务group = [" + jobEntityFind.getJobGroup() + "]");
try {
TaskUtils.invokeMethod(jobEntityFind);
if (context.getNextFireTime() == null) {
jobEntityFind.setJobStatus(TaskStateEnum.COMPLETE.getIndex());
} else {
jobEntityFind.setJobStatus(TaskStateEnum.NORMAL.getIndex());
}
} catch (SchedulerException e) {
log.error(e.getMessage());
jobEntityFind.setJobStatus(TaskStateEnum.ERROR.getIndex());
jobEntityFind.setJobExceptionCount(jobEntityFind.getJobExceptionCount() + 1);
}
jobEntityFind.setJobExecCount(jobEntityFind.getJobExecCount() + 1);
jobEntityFind.setPreviousTime(new Date());
jobEntityFind.setNextTime(context.getNextFireTime());
jobEntityRepository.save(jobEntityFind);
}
}④ 工具类
@Log4j
public class TaskUtils {
/**
* 通过反射调用scheduleJob中定义的方法
*
* @param jobEntity
*/
public static void invokeMethod(JobEntity jobEntity) throws SchedulerException {
Object object = null;
Class<?> clazz = null;
//springId不为空先按springId查找bean
if (StringUtils.isNotBlank(jobEntity.getSpringId())) {
object = SpringUtils.getBean(jobEntity.getSpringId());
} else if (StringUtils.isNotBlank(jobEntity.getJobClass())) {//按jobClass查找
try {
clazz = Class.forName(jobEntity.getJobClass());
object = clazz.newInstance();
} catch (Exception e) {
log.error(e);
throw new SchedulerException("job class 查找异常!");
}
}
if (object == null) {
log.error("任务名称 = [" + jobEntity.getJobName() + "]---------------未启动成功,请检查执行类是否配置正确!!!");
return;
}
clazz = object.getClass();
Method method = null;
Map<String, Object> jobDataMap = new HashMap<>();
if (jobEntity.getJobDataMap() != null) jobDataMap = jobEntity.getJobDataMap();
try {
if (StringUtils.isBlank(jobEntity.getMethodName())) {
method = clazz.getDeclaredMethod("execute", Map.class);
} else {
method = clazz.getDeclaredMethod(jobEntity.getMethodName(), Map.class);
}
} catch (NoSuchMethodException e) {
log.error("任务名称 = [" + jobEntity.getJobName() + "]---------------未启动成功,请检查执行类的方法名是否设置错误!!!", e);
throw new SchedulerException("job method 查找异常!");
} catch (SecurityException e) {
log.error(e);
throw new SchedulerException("job method 查找异常!");
}
if (method != null) {
try {
method.invoke(object, jobDataMap);
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
log.error(e);
throw new SchedulerException("job method 参数异常!");
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
log.error(e);
throw new SchedulerException("job method 参数异常!");
} catch (InvocationTargetException e) {
// TODO Auto-generated catch block
log.error(e);
throw new SchedulerException("job method 参数异常!");
}
}
}
/**
* 判断cron时间表达式正确性
* @param cronExpression
* @return
*/
public static boolean isValidExpression(final String cronExpression){
CronTriggerImpl trigger = new CronTriggerImpl();
try {
trigger.setCronExpression(cronExpression);
Date date = trigger.computeFirstFireTime(null);
return date != null && date.after(new Date());
} catch (ParseException e) {
log.error("cron 表达式:"+ cronExpression +"解析错误", e);
}
return false;
}
}四、tips
spring quartz 是天生支持集群的,只需要在自定义的配置文件(一般是 quartz.properties)中添加:# 设置true 告诉了Scheduler实例要它参与到一个集群当中 org.quartz.jobStore.isClustered=true此操作即告诉它处于集群环境。
源码链接



文章评论
为何如此优秀