一、效果预览
二、流程梳理
除了数据库中 quartz 需要的 11 张表,我们同时自定义一张表来维护所有的定时任务信息,包括执行的类,执行的方法,并记录总共运行的次数和运行失败的次数(其实最好再加一张运行记录表)。三、后端实现
前端代码暂不赘述,详见文末源码链接。如下是一些关键逻辑的代码:
① 自定义实体类:
@Data @Entity @Table(name="job_entity") public class JobEntity { public static final String STATUS_RUNNING = "1"; public static final String STATUS_NOT_RUNNING = "0"; public static final String CONCURRENT_DISALLOWED = "1"; public static final String CONCURRENT_ALLOWED = "0"; @Id @GeneratedValue(strategy= GenerationType.IDENTITY) private Long id; /** 任务id */ //private String jobId; /** 任务名称 */ private String jobName; /** 任务分组,任务名称+组名称应该是唯一的 */ private String jobGroup; /** 任务状态*/ private String jobStatus; /** 任务data */ private String jobDataMapString; @Transient private Map<String, Object> jobDataMap; /** 触发器名称*/ private String triggerName; /** 触发器分组*/ private String triggerGroup; /** 触发器时区*/ private String triggerTimeZone; /** 任务是否取消并发 (默认是有并发的) */ private String isConcurrentDisallowed; /** 任务运行时间表达式 */ private String cronExpression; /** 任务描述 */ private String description; /** 任务调用类在spring中注册的bean id,如果spingId不为空,则按springId查找 */ private String springId; /** 任务调用类名,包名+类名,通过类反射调用 ,如果spingId为空,则按jobClass查找 */ private String jobClass; /** 任务调用的方法名 */ private String methodName; /** 启动时间 */ private Date startTime; /** 前一次运行时间 */ private Date previousTime; /** 下次运行时间 */ private Date nextTime; /** job 运行次数(包括异常)*/ private int jobExecCount; /** job 运行异常次数*/ private int jobExceptionCount; @Column(name="create_dt") private Timestamp createDate; // 1:已删除,0:未删除 @Column(name="is_del") private Boolean isDelete; public JobEntity(){} }
② JobService
业务层控制代码: @Service @Log4j public class JobService { @Autowired private SchedulerFactoryBean schedulerFactoryBean; @Autowired JobEntityRepository jobEntityRepository; /** * 获取所有任务 * @return * @throws SchedulerException */ public PageResultForBootstrap<JobEntity> getAllJobs(PageRequest pageRequest, Map requestValue) throws SchedulerException{ Specification specification = (Specification<JobEntity>)(root, query, cb) -> { List<Predicate> predicates = new ArrayList<>(); if (!org.springframework.util.StringUtils.isEmpty(requestValue.get("status"))) { predicates.add(cb.equal(root.get("jobStatus").as(String.class), requestValue.get("status").toString())); } else { predicates.add(cb.notEqual(root.get("jobStatus").as(String.class), TaskStateEnum.COMPLETE.getIndex())); } predicates.add(cb.equal(root.get("isDelete").as(Boolean.class), Boolean.FALSE)); return cb.and(predicates.toArray(new Predicate[predicates.size()])); }; Page<JobEntity> page = jobEntityRepository.findAll(specification, pageRequest); PageResultForBootstrap<JobEntity> result = new PageResultForBootstrap<>(page); page.getContent().forEach(val -> { if (StringUtils.isNotBlank(val.getJobDataMapString())) { val.setJobDataMap(JSON.parseObject(val.getJobDataMapString(), Map.class)); } }); result.setRows(page.getContent()); return result; } /** * 添加任务 */ @Transactional public boolean addJob(JobEntity job) throws SchedulerException { if (job == null) { return false; } if(!TaskUtils.isValidExpression(job.getCronExpression())){ log.error("时间表达式错误("+job.getJobName()+","+job.getJobGroup()+"),"+job.getCronExpression()); return false; }else{ JobEntity job2Save; Scheduler scheduler = schedulerFactoryBean.getScheduler(); // 任务名称和任务组设置规则: // 名称:task_1 .. // 组 :group_1 .. TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup()); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); // 不存在,创建一个 if (null == trigger) { //是否允许并发执行 Class<? extends Job> clazz = job.getIsConcurrentDisallowed() != null && JobEntity.CONCURRENT_DISALLOWED.equals(job.getIsConcurrentDisallowed()) ? QuartzJobFactoryDisallowConcurrentExecution.class : QuartzJobFactory.class; JobBuilder jobBuilder = JobBuilder.newJob(clazz).withIdentity(job.getJobName(), job.getJobGroup()); if (StringUtils.isNotBlank(job.getDescription())) { // 备注配置 jobBuilder.withDescription(job.getDescription()); } JobDetail jobDetail = jobBuilder.build(); if (job.getJobDataMap() != null) { // 参数配置 jobDetail.getJobDataMap().putAll(job.getJobDataMap()); } // 表达式调度构建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression()); // 按新的表达式构建一个新的trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); scheduler.scheduleJob(jobDetail, trigger); job2Save = job; if (job2Save.getJobDataMap() != null){ job2Save.setJobDataMapString(JSON.toJSONString(job2Save.getJobDataMap())); } } else { // trigger已存在,则更新相应的定时设置 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression()); // 按新的cronExpression表达式重新构建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); // 按新的trigger重新设置job执行 scheduler.rescheduleJob(triggerKey, trigger); job2Save = jobEntityRepository.findByJobNameAndJobGroup(job.getJobName(), job.getJobGroup()); if (job.getJobDataMap() != null){ job2Save.setJobDataMapString(JSON.toJSONString(job.getJobDataMap())); } else { job2Save.setJobDataMapString(null); } job2Save.setSpringId(job.getSpringId()); job2Save.setJobClass(job.getJobClass()); job2Save.setMethodName(job.getMethodName()); job2Save.setDescription(job.getDescription()); job2Save.setCronExpression(job.getCronExpression()); } // 自定义实体类存入数据库 if (job2Save.getJobStatus() == null) { job2Save.setJobStatus(TaskStateEnum.NORMAL.getIndex()); } job2Save.setIsDelete(false); job2Save.setStartTime(trigger.getStartTime()); job2Save.setNextTime(trigger.getNextFireTime()); job2Save.setTriggerName(trigger.getKey().getName()); job2Save.setTriggerGroup(trigger.getKey().getGroup()); job2Save.setTriggerTimeZone(trigger.getTimeZone().getID()); jobEntityRepository.save(job2Save); } return true; } /** * 暂停任务 * @param jobEntity * @return */ @Transactional public boolean pauseJob(JobEntity jobEntity){ Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(jobEntity.getJobName(), jobEntity.getJobGroup()); try { JobEntity jobFind = jobEntityRepository.findByJobNameAndJobGroup(jobEntity.getJobName(), jobEntity.getJobGroup()); jobFind.setJobStatus(TaskStateEnum.PAUSED.getIndex()); jobEntityRepository.save(jobFind); scheduler.pauseJob(jobKey); return true; } catch (SchedulerException e) { log.error("暂停任务异常: " + jobEntity.toString(), e); } return false; } /** * 恢复任务 * @param jobEntity * @return */ @Transactional public boolean resumeJob(JobEntity jobEntity){ Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(jobEntity.getJobName(), jobEntity.getJobGroup()); try { JobEntity jobFind = jobEntityRepository.findByJobNameAndJobGroup(jobEntity.getJobName(), jobEntity.getJobGroup()); jobFind.setJobStatus(TaskStateEnum.NORMAL.getIndex()); jobEntityRepository.save(jobFind); scheduler.resumeJob(jobKey); return true; } catch (SchedulerException e) { log.error("恢复任务异常: " + jobEntity.toString(), e); } return false; } /** * 删除任务 */ @Transactional public boolean deleteJob(JobEntity jobEntity){ Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(jobEntity.getJobName(), jobEntity.getJobGroup()); try{ JobEntity jobFind = jobEntityRepository.findByJobNameAndJobGroup(jobEntity.getJobName(), jobEntity.getJobGroup()); jobFind.setJobStatus(TaskStateEnum.DELETE.getIndex()); jobFind.setIsDelete(true); jobEntityRepository.save(jobFind); scheduler.deleteJob(jobKey); return true; } catch (SchedulerException e) { log.error("删除任务异常: " + jobEntity.toString(), e); } return false; } /** * 立即执行一个任务 * @param jobEntity * @throws SchedulerException */ public void executeOnce(JobEntity jobEntity) throws SchedulerException{ Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(jobEntity.getJobName(), jobEntity.getJobGroup()); scheduler.triggerJob(jobKey); } }
③ 支持并发的 JobFactory
不支持并发的 Factory 仅仅在类上增加注解:@DisallowConcurrentExecution@Log4j public class QuartzJobFactoryDisallowConcurrentExecution implements Job { @Autowired JobEntityRepository jobEntityRepository; @Override @Transactional public void execute(JobExecutionContext context){ JobKey jobKey = context.getJobDetail().getKey(); JobEntity jobEntityFind = jobEntityRepository.findByJobNameAndJobGroup(jobKey.getName(), jobKey.getGroup()); log.info("运行任务名称 = [" + jobEntityFind.getJobName() + "],任务group = [" + jobEntityFind.getJobGroup() + "]"); try { TaskUtils.invokeMethod(jobEntityFind); if (context.getNextFireTime() == null) { jobEntityFind.setJobStatus(TaskStateEnum.COMPLETE.getIndex()); } else { jobEntityFind.setJobStatus(TaskStateEnum.NORMAL.getIndex()); } } catch (SchedulerException e) { log.error(e.getMessage()); jobEntityFind.setJobStatus(TaskStateEnum.ERROR.getIndex()); jobEntityFind.setJobExceptionCount(jobEntityFind.getJobExceptionCount() + 1); } jobEntityFind.setJobExecCount(jobEntityFind.getJobExecCount() + 1); jobEntityFind.setPreviousTime(new Date()); jobEntityFind.setNextTime(context.getNextFireTime()); jobEntityRepository.save(jobEntityFind); } }
④ 工具类
@Log4j public class TaskUtils { /** * 通过反射调用scheduleJob中定义的方法 * * @param jobEntity */ public static void invokeMethod(JobEntity jobEntity) throws SchedulerException { Object object = null; Class<?> clazz = null; //springId不为空先按springId查找bean if (StringUtils.isNotBlank(jobEntity.getSpringId())) { object = SpringUtils.getBean(jobEntity.getSpringId()); } else if (StringUtils.isNotBlank(jobEntity.getJobClass())) {//按jobClass查找 try { clazz = Class.forName(jobEntity.getJobClass()); object = clazz.newInstance(); } catch (Exception e) { log.error(e); throw new SchedulerException("job class 查找异常!"); } } if (object == null) { log.error("任务名称 = [" + jobEntity.getJobName() + "]---------------未启动成功,请检查执行类是否配置正确!!!"); return; } clazz = object.getClass(); Method method = null; Map<String, Object> jobDataMap = new HashMap<>(); if (jobEntity.getJobDataMap() != null) jobDataMap = jobEntity.getJobDataMap(); try { if (StringUtils.isBlank(jobEntity.getMethodName())) { method = clazz.getDeclaredMethod("execute", Map.class); } else { method = clazz.getDeclaredMethod(jobEntity.getMethodName(), Map.class); } } catch (NoSuchMethodException e) { log.error("任务名称 = [" + jobEntity.getJobName() + "]---------------未启动成功,请检查执行类的方法名是否设置错误!!!", e); throw new SchedulerException("job method 查找异常!"); } catch (SecurityException e) { log.error(e); throw new SchedulerException("job method 查找异常!"); } if (method != null) { try { method.invoke(object, jobDataMap); } catch (IllegalAccessException e) { // TODO Auto-generated catch block log.error(e); throw new SchedulerException("job method 参数异常!"); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block log.error(e); throw new SchedulerException("job method 参数异常!"); } catch (InvocationTargetException e) { // TODO Auto-generated catch block log.error(e); throw new SchedulerException("job method 参数异常!"); } } } /** * 判断cron时间表达式正确性 * @param cronExpression * @return */ public static boolean isValidExpression(final String cronExpression){ CronTriggerImpl trigger = new CronTriggerImpl(); try { trigger.setCronExpression(cronExpression); Date date = trigger.computeFirstFireTime(null); return date != null && date.after(new Date()); } catch (ParseException e) { log.error("cron 表达式:"+ cronExpression +"解析错误", e); } return false; } }
四、tips
spring quartz 是天生支持集群的,只需要在自定义的配置文件(一般是 quartz.properties)中添加:# 设置true 告诉了Scheduler实例要它参与到一个集群当中 org.quartz.jobStore.isClustered=true此操作即告诉它处于集群环境。
源码链接
文章评论
为何如此优秀