目录
前言
前几篇介绍了单体架构的定时任务解决方式,但是现代软件架构由于业务复杂度高,业务的耦合性太强,已经由单体架构拆分成了分布式架构。因此,定时任务的架构也随之修改。而quartz是分布式定时任务kb88凯时官网登录的解决方案中使用简单,结构清晰,且不依赖第三方分布式调度中间件的。上车,mars酱带你车里细说~
角色介绍
quartz入门使用的角色不多,三个角色足够,分别是:
scheduler
:调度器。用来负责任务的调度;
job
:任务。这是一个接口,业务代码继承job接口并实现它的execute
方法,是业务执行的主体部分;
trigger
: 触发器。也是个接口,有两个触发器比较关键,一个是simpletrigger
,另一个是crontrigger
。前者支持简单的定时,比如:按时、按秒等;后者直接支持cron表达式。下面我们从官方的源代码入手,看看quartz如何做到分布式的。
官方例子
官方源代码down下来之后,有个examples文件夹:
example1是入门级中最简单的。就两个java文件,一个hellojob:
package org.quartz.examples.example1; import java.util.date; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.quartz.job; import org.quartz.jobexecutioncontext; import org.quartz.jobexecutionexception; /** ** this is just a simple job that says "hello" to the world. *
* * @author bill kratzer */ public class hellojob implements job { private static logger _log = loggerfactory.getlogger(hellojob.class); /** ** empty constructor for job initilization *
** quartz requires a public empty constructor so that the * scheduler can instantiate the class whenever it needs. *
*/ public hellojob() { } /** ** called by the
* * @throws jobexecutionexception * if there is an exception while executing the job. */ public void execute(jobexecutioncontext context) throws jobexecutionexception { // say hello to the world and display the date/time _log.info("hello world! - " new date()); } }{@link org.quartz.scheduler}
when a *{@link org.quartz.trigger}
fires that is associated with * thejob
. *
另一个simpleexample:
package org.quartz.examples.example1; import org.quartz.jobdetail; import org.quartz.scheduler; import org.quartz.schedulerfactory; import org.quartz.trigger; import org.quartz.impl.stdschedulerfactory; import org.slf4j.logger; import org.slf4j.loggerfactory; import java.util.date; import static org.quartz.datebuilder.evenminutedate; import static org.quartz.jobbuilder.newjob; import static org.quartz.triggerbuilder.newtrigger; /** * this example will demonstrate how to start and shutdown the quartz scheduler and how to schedule a job to run in * quartz. * * @author bill kratzer */ public class simpleexample { public void run() throws exception { logger log = loggerfactory.getlogger(simpleexample.class); log.info("------- initializing ----------------------"); // 1. 创建一个scheduler schedulerfactory sf = new stdschedulerfactory(); scheduler sched = sf.getscheduler(); log.info("------- initialization complete -----------"); // computer a time that is on the next round minute date runtime = evenminutedate(new date()); log.info("------- scheduling job -------------------"); // 2. 指定一个job jobdetail job = newjob(hellojob.class).withidentity("job1", "group1").build(); // 3. 指定一个trigger trigger trigger = newtrigger().withidentity("trigger1", "group1").startat(runtime).build(); // 4. 绑定job和trigger sched.schedulejob(job, trigger); log.info(job.getkey() " will run at: " runtime); // 5. 执行 sched.start(); log.info("------- started scheduler -----------------"); // wait long enough so that the scheduler as an opportunity to // run the job! log.info("------- waiting 65 seconds... -------------"); try { // wait 65 seconds to show job thread.sleep(65l * 1000l); // executing... } catch (exception e) { // } // shut down the scheduler log.info("------- shutting down ---------------------"); sched.shutdown(true); log.info("------- shutdown complete -----------------"); } public static void main(string[] args) throws exception { simpleexample example = new simpleexample(); example.run(); } }
整个simpleexample只有五个步骤:
- 创建scheduler,这是一个调度器,例子中使用调度器工厂来创建一个调度器;
- 创建一个job。实际上job就是那个hellojob,但是这里把hellojob丢给了jobdetail对象,job接口本身只有一个execute函数,没有其他的属性了,如果需要附加其他属性,jobdetail就支持,比如我们需要往job中传递参数,jobdetail中提供了一个jobdatamap。当job在运行的时候,execute函数里面的就能获取到jobdetail对象,并将设置的数据传递给job接口的实现;
- 创建一个trigger。trigger对象主责是任务的执行时间,比如官方例子中的startat函数,就指定了具体的运行时间,还有startnow(立即执行);
- 用scheduler绑定job和trigger;
- 执行scheduler。
quartz的使用是不是简单又清晰?job是任务,单一职责,不做任何其他事情。trigger负责执行的频率等等属性。scheduler负责按照trigger的规则去执行job的内容。各自部分的功能符合单一原则。
但是,到这里都不是分布式的方式,依然是单体架构的。那么,quartz如何做到分布式的呢?
quartz如何分布式?
quartz的分布式实现方式并不依赖其他分布式协调管理中间件完成,而是使用数据锁来实现。使用数据做协调管理中间件的唯一的前提是:需要把集群的每台机器时间校对一致。
quartz数据库核心表如下:
表名 | 功能描述 |
---|---|
qrtz_calendars | 存储quartz的calendar信息 |
qrtz_cron_triggers | 存储crontrigger,包括cron表达式和时区信息 |
qrtz_fired_triggers | 存储与已触发的trigger相关的状态信息,以及相联job的执行信息 |
qrtz_paused_trigger_grps | 存储已暂停的trigger组的信息 |
qrtz_scheduler_state | 存储少量的有关scheduler的状态信息,和别的scheduler实例 |
qrtz_locks | 存储程序的悲观锁的信息 |
qrtz_job_details | 存储每一个已配置的job的详细信息 |
qrtz_job_listeners | 存储有关已配置的joblistener的信息 |
qrtz_simple_triggers | 存储简单的trigger,包括重复次数、间隔、以及已触的次数 |
qrtz_blog_triggers | trigger作为blob类型存储 |
qrtz_trigger_listeners | 存储已配置的triggerlistener的信息 |
qrtz_triggers | 存储已配置的trigger的信息 |
字体加粗的qrtz_locks表是一个悲观锁的存储实现,quartz认为每条记录都可能会产生并发冲突。以上表的sql可以在quartz目录中找到:
找到自己喜欢的数据库品牌,并创建好表即可。
跟着官方例子看源码
我们从hello的execute方法开始,反着找,继续看看分布式的方式如何实现。为什么反着找呢?因为这里是我们业务实现的主体内容,quartz框架最终必须要调用到这个execute的具体实现的。我们找到调用execute的地方有很多处:
从类名来分析,directoryscanjob看着是目录扫描任务,filescanjob直译是文件扫描任务,sendmailjob是发送邮件任务,最后只剩那个jobrunshell,毕竟翻译过来叫“任务运行の核心”啊。进入jobrunshell,找到调用execute函数的部分,execute函数被包裹在一个一百三十多行长又长的run函数中:
public void run() { qs.addinternalschedulerlistener(this); try { // ...省略很多源代码 do { // ...省略很多源代码 try { begin(); } catch (schedulerexception se) { // ... 省略源代码 } // ... 省略源代码 try { log.debug("calling execute on job " jobdetail.getkey()); // 这里负责执行job的execute函数 job.execute(jec); endtime = system.currenttimemillis(); } catch (jobexecutionexception jee) { // ... 省略源代码 } catch (throwable e) { // ... 省略源代码 } // ...省略很多源代码 try { complete(true); } catch (schedulerexception se) { // ... 省略源代码 } // ...省略很多源代码 } while (true); } finally { qs.removeinternalschedulerlistener(this); } }
可以看到run中间的execute被夹在一个begin函数和comlete函数中,而begin和complete的实现是一个基于jta事务的jtajobrunshell的实现来完成的。jobrunshell是一个runnable接口的实现,那么刚刚的run方法,必定在某处启用了线程(池)的start方法。
mars酱继续跟踪查找源代码,在quartzschedulerthread中的run函数中,找到jobrunshell的调用部分:
@override public void run() { int acquiresfailed = 0; while (!halted.get()) { // ...省略很多源代码 // 源代码279行 int availthreadcount = qsrsrcs.getthreadpool().blockforavailablethreads(); // ...省略很多源代码 if(availthreadcount > 0) { // ...省略很多源代码 // 取下一个trigger,周期是30秒取一次 triggers = qsrsrcs.getjobstore().acquirenexttriggers( now idlewaittime, math.min(availthreadcount, qsrsrcs.getmaxbatchsize()), qsrsrcs.getbatchtimewindow()); // ...省略很多源代码 // 触发trigger listres = qsrsrcs.getjobstore().triggersfired(triggers); // ...省略很多源代码 // 释放trigger,当bndle的结果是null就释放trigger if (bndle == null) { qsrsrcs.getjobstore().releaseacquiredtrigger(triggers.get(i)); continue; } // ...省略很多源代码 jobrunshell shell = null; try { shell = qsrsrcs.getjobrunshellfactory().createjobrunshell(bndle); shell.initialize(qs); } catch (schedulerexception se) { qsrsrcs.getjobstore().triggeredjobcomplete(triggers.get(i), bndle.getjobdetail(), completedexecutioninstruction.set_all_job_triggers_error); continue; } // 这里调用jobrunshell if (qsrsrcs.getthreadpool().runinthread(shell) == false) { // ...省略很多源代码 } } } }
quartzschedulerthread的run函数就是核心处理流程了,qsrsrcs.getthreadpool().runinthread(shell)
内部就根据具体的simplethreadpool或者zerosizethreadpool来执行run函数,while循环基本就是不停的在轮询不断的去拿trigger,然后判断trigger的时间是不是到了,再按照时间trigger的时间规则执行job,最后再标记为完成、释放trigger。
trigger的处理
上面的逻辑都是通过qsrsrcs.getjobstore()
得到的对象去处理trigger的,返回对象是jobstore。任意查看qsrsrcs.getjobstore()
调用的函数,比如:releaseacquiredtriggerjobstore,它的实现有两个是比较重要的:一个是ramjobstore,一个是jobstoresupport。前者是ram作为存储介质,作者还特意写上了这样一段注释:
this class implements a jobstore that utilizes ram as its storage device.
as you should know, the ramification of this is that access is extrememly fast, but the data is completely volatile - therefore this jobstore should not be used if true persistence between program shutdowns is required.
这段英文的央视翻译:
这个类实现了一个使用ram作为存储设备的jobstore。
您应该知道,这样做的后果是访问速度非常快,但是数据是完全不稳定的——因此,如果需要在程序关闭之间实现真正的持久性,则不应该使用这个jobstore。
而且内存存储也无法分布式处理吧?所以,mars酱选择了观看jobstoresupport:
从import可以知道,这个玩意是连接了数据库的,所以呢,acquirenexttriggers、triggersfired、releaseacquiredtrigger这些方法负责具体trigger的相关操作,都最终会执行到jobstoresupport的第3844行的executeinnonmanagedtxlock函数:
/** * execute the given callback having optionally acquired the given lock. * this uses the non-managed transaction connection. * * @param lockname the name of the lock to acquire, for example * "trigger_access". if null, then no lock is acquired, but the * lockcallback is still executed in a non-managed transaction. */ protectedt executeinnonmanagedtxlock( string lockname, transactioncallback txcallback, final transactionvalidator txvalidator) throws jobpersistenceexception { boolean transowner = false; connection conn = null; try { if (lockname != null) { // if we aren't using db locks, then delay getting db connection // until after acquiring the lock since it isn't needed. if (getlockhandler().requiresconnection()) { conn = getnonmanagedtxconnection(); } transowner = getlockhandler().obtainlock(conn, lockname); } if (conn == null) { conn = getnonmanagedtxconnection(); } final t result = txcallback.execute(conn); try { commitconnection(conn); } catch (jobpersistenceexception e) { rollbackconnection(conn); if (txvalidator == null || !retryexecuteinnonmanagedtxlock(lockname, new transactioncallback () { @override public boolean execute(connection conn) throws jobpersistenceexception { return txvalidator.validate(conn, result); } })) { throw e; } } long sigtime = clearandgetsignalschedulingchangeontxcompletion(); if(sigtime != null && sigtime >= 0) { signalschedulingchangeimmediately(sigtime); } return result; } catch (jobpersistenceexception e) { rollbackconnection(conn); throw e; } catch (runtimeexception e) { rollbackconnection(conn); throw new jobpersistenceexception("unexpected runtime exception: " e.getmessage(), e); } finally { try { releaselock(lockname, transowner); } finally { cleanupconnection(conn); } } }
整体的过程简要说明就是:获取数据库连接,给需要执行的trigger加锁,处理完之后再释放锁。
结合起来
结合前面的流程来看,一个调度器在执行前如果涉及到分布式的情况,流程如下:
- 首先要获取quartz_locks表中对应的锁(在
executeinnonmanagedtxlock
函数的getlockhandler().obtainlock(conn, lockname)
中); - 获取锁后执行quartzschedulerthread中的jobrunshell,完成任务的执行;
- 最后quartzschedulerthread中调用
triggeredjobcomplete
函数,锁被释放,在executeinnonmanagedtxlock
函数的releaselock(lockname, transowner)
中执行;
集群中的每一个调度器实例都遵循这样的操作流程。
总结
quartz 是一款用于分布式系统的高性能调度框架,它采用了数据库作为分布式锁机制来保证同一时刻只有一个 scheduler 实例访问数据库中的 trigger。
在 quartz 中,调度器线程会争抢访问数据库中的 trigger,以确保在同一时刻只有一个调度器线程执行某个 trigger 的操作。如果有多个调度器线程同时尝试访问同一个 trigger,它们会相互等待对方释放锁。当一个调度器线程获得了锁,它就可以访问数据库并执行相应的操作。
另外,quartz 还采用了悲观锁的策略来避免死锁的发生。当一个调度器线程尝试获取锁时,如果锁已经被其他线程占用,那么这个线程会等待,直到有线程释放了锁。如果在等待过程中没有其他线程释放锁,那么这个线程就会一直等待下去,直到调度器重新分配了锁。
总之,quartz 的分布式调度原理是通过数据库锁和悲观锁来实现的,以保证同一时刻只有一个调度器线程访问数据库中的 trigger,从而提高系统的性能和可靠性。
以上就是一分钟掌握java quartz定时任务的详细内容,更多关于java quartz定时任务的资料请关注其它相关文章!