先简单介绍一下quartz,Quartz是一个功能丰富的开源作业调度库,可以集成到几乎任何Java应用程序中 - 从最小的独立应用程序到最大的电子商务系统。quartz可用于创建执行数十,数百甚至数十万个作业的简单或复杂的计划; 任务定义为标准Java组件的任务,可以执行任何可以对其进行编程的任何内容。Quartz Scheduler包含许多企业级功能,例如支持JTA事务和集群。
以上内容来自quartz官网。
我们会针对quartz的几个主要类进行分析,看一下quartz是如何实现定时调度功能
1.1测试Demo
来自官方实例的example11
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71public class SimpleExample {
public void run() throws Exception {
Logger log = LoggerFactory.getLogger(SimpleExample.class);
log.info("------- Initializing ----------------------");
// First we must get a reference to a scheduler
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();
log.info("------- Initialization Complete -----------");
// computer a time that is on the next round minute
Date runTime = evenMinuteDate(new Date());
log.info("------- Scheduling Job -------------------");
// define the job and tie it to our HelloJob class
JobDetail job = newJob(HelloJob.class).withIdentity("job1", "group1").build();
// Trigger the job to run on the next round minute
Trigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(runTime).build();
// Tell quartz to schedule the job using our trigger
sched.scheduleJob(job, trigger);
log.info(job.getKey() + " will run at: " + runTime);
// Start up the scheduler (nothing can actually run until the
// scheduler has been started)
sched.start();
log.info("------- Started Scheduler -----------------");
// wait long enough so that the scheduler as an opportunity to
// run the job!
log.info("------- Waiting 65 seconds... -------------");
try {
// wait 65 seconds to show job
Thread.sleep(65L * 1000L);
// executing...
} catch (Exception e) {
//
}
// shut down the scheduler
log.info("------- Shutting Down ---------------------");
sched.shutdown(true);
log.info("------- Shutdown Complete -----------------");
}
public static void main(String[] args) throws Exception {
SimpleExample example = new SimpleExample();
example.run();
}
}
public class HelloJob implements Job {
private static Logger _log = LoggerFactory.getLogger(HelloJob.class);
public HelloJob() {
}
public void execute(JobExecutionContext context)
throws JobExecutionException {
_log.info("Hello World! - " + new Date());
}
}
1.2 quartz关键API
从上述example中我们可以看到quartz主要接口和类
- Scheduler - 进行作业调度的主要接口.
- Job - 作业接口,编写自己的作业需要实现,如example中的HelloJob
- JobDetail - 作业的详细信息,除了包含作业本身,还包含一些额外的数据。
- Trigger - 作业计划的组件-作业何时执行,执行次数,频率等。
- JobBuilder - 建造者模式创建 JobDetail实例.
- TriggerBuilder - 建造者模式创建 Trigger 实例.
- QuartzSchedulerThread 继承Thread 主要的执行任务线程
从上面的几个接口,可以看到quartz设计非常精妙,将作业和触发器分开设计,同时调度器完成对作业的调度。
了解了几个关键类和接口作用,下面我们来分析整个执行过程。
1.3 执行过程分析
1.3.1 从StdSchedulerFactory获取scheduler
1 |
|
- cfg变量为PropertiesParser实例————是quartz的配置信息(主要是quartz.properties),如果为空,就初始化读取quartz的配置信息。
- SchedulerRepository是一个HashMap,用于存储Scheduler。如果有重名的,判断是否已经停止,是从hashMap删掉,否直接返回已保存实例
- SchedulerRepository未找到,实例化一个scheduler
1 | private Scheduler instantiate() throws SchedulerException { |
instantiate()是一个比较重要的方法,主要从上一段代码介绍的PropertiesParser获取Scheduler配置信息
- 获取Scheduler Properties
- 如果是rmi代理sheduler,则创建RemoteScheduler,并通过scheRep.bind放入SchedulerRepository中,返回scheduler,结束
- 创建class load helper 加载类提供帮助
如果是jmx scheduler 则进行对应操作,并通过scheRep.bind放入SchedulerRepository中,返回scheduler,结束
往下是本地调度,不是远程调度,因此需要获取和本地调度相关的信息
6.获取线程池配置
7.获取Job存储设置,(分为内存存储和数据库存储作业还有一个不太了解的方式,默认是内存存储)
8.设置数据库连接池
9.安装Scheduler插件
10.安装JobListener ,监听作业启动前,作业呗否决执行,作业已经执行,参见[JobListener.class]
11.安装TriggerListener,监听触发器被触发工作将要执行时,触发错过,否决工作执行,工作完成,参见[TriggerListener.class]
12.获取ThreadExecutor配置
13.初始化JobRunShellFactory,这个工厂类很重要,后面会介绍
14.创建标准StdScheduler,返回。
1.3.2 创建JobDetail
JobDeatil包含jobDataMap和jobClass,以及一些描述,名称等等,采用Build模式建造,jobDataMap主要存储一些额外信息。
1.3.3 创建Trigger
Trigger主要包含了一些计划信息,详细可参考接口
准备工作做完了,开始进行调度
1.3.3 scheduler.scheduleJob()
1 | public Date scheduleJob(JobDetail jobDetail, Trigger trigger) |
sched是StdScheduler的一个成员,是QuartzScheduler的实例,在上面的instantiate()方法中实例化过并且用它构造StdScheduler;看一下QuartzScheduler的scheduleJob1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49public Date scheduleJob(JobDetail jobDetail,
Trigger trigger) throws SchedulerException {
validateState();
if (jobDetail == null) {
throw new SchedulerException("JobDetail cannot be null");
}
if (trigger == null) {
throw new SchedulerException("Trigger cannot be null");
}
if (jobDetail.getKey() == null) {
throw new SchedulerException("Job's key cannot be null");
}
if (jobDetail.getJobClass() == null) {
throw new SchedulerException("Job's class cannot be null");
}
OperableTrigger trig = (OperableTrigger)trigger;
if (trigger.getJobKey() == null) {
trig.setJobKey(jobDetail.getKey());
} else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
throw new SchedulerException(
"Trigger does not reference given job!");
}
trig.validate();
Calendar cal = null;
if (trigger.getCalendarName() != null) {
cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
}
Date ft = trig.computeFirstFireTime(cal);
if (ft == null) {
throw new SchedulerException(
"Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
}
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
notifySchedulerListenersJobAdded(jobDetail);
notifySchedulerThread(trigger.getNextFireTime().getTime());
notifySchedulerListenersSchduled(trigger);
return ft;
}
让我们来看一下schedule()方法
- 一些校验,jobDetail,trigger,jobClass判空
- 计算第一次执行的日期,计算日期为空,则抛出异常,job永远不会被调度
- 如果上述通过,通过jobStore存储jobDetail和trigger
- 通知监听器程序:工作加入通知、
1 | public void notifySchedulerListenersJobAdded(JobDetail jobDetail) { |
5.通知监听器程序:通知正在休眠(工作可能都执行完,主线程sigLock.wait())的主执行线程,有工作加入,唤醒主线程
1 | protected void notifySchedulerThread(long candidateNewNextFireTime) { |
6.通知调度器job被调度了
1 | public void notifySchedulerListenersSchduled(Trigger trigger) { |
此时scheduler.scheduleJob()执行完毕
1.3.4 scheduler.start() 未执行此函数,没有什么会真正执行,主线程loop循环一直被wait。
1 | public void start() throws SchedulerException { |
- 判断调度器是否关掉或停止(因为可能开始之前已经将调度器停止或关闭),是则直接抛异常
- 通知监听器,马上就要执行
- 启动一些通知jobStore和一些插件开始执行,如果initial不为空,说明曾经启动过,则重新恢复
- 切换线程开始执行togglePause(false)。此时开始执行, 下面的run()方法选取了一部分,未设置开始之前,while里面还有一个while (paused && !halted.get()) 一直等待变量改变,否则,就一直wait这也就是sche.scheduleJob()未真正启动工作的原因
1 | void togglePause(boolean pause) { |
5.通知任务已经开始执行
1 | public void notifySchedulerListenersStarted() { |
至此,整个流程大约完成了。
具体执行的(run)细节看看有空再讲一下。