一分钟掌握java quartz定时任务-kb88凯时官网登录

来自:网络
时间:2023-05-17
阅读:
免费资源网 - https://freexyz.cn/
目录

前言

前几篇介绍了单体架构的定时任务解决方式,但是现代软件架构由于业务复杂度高,业务的耦合性太强,已经由单体架构拆分成了分布式架构。因此,定时任务的架构也随之修改。而quartz是分布式定时任务kb88凯时官网登录的解决方案中使用简单,结构清晰,且不依赖第三方分布式调度中间件的。上车,mars酱带你车里细说~

角色介绍

quartz入门使用的角色不多,三个角色足够,分别是:

scheduler:调度器。用来负责任务的调度;

job:任务。这是一个接口,业务代码继承job接口并实现它的execute方法,是业务执行的主体部分;

trigger: 触发器。也是个接口,有两个触发器比较关键,一个是simpletrigger,另一个是crontrigger。前者支持简单的定时,比如:按时、按秒等;后者直接支持cron表达式。下面我们从官方的源代码入手,看看quartz如何做到分布式的。

官方例子

官方源代码down下来之后,有个examples文件夹:

example1是入门级中最简单的。就两个java文件,一个hellojob:

package org.quartz.examples.example1;
import java.util.date;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.quartz.job;
import org.quartz.jobexecutioncontext;
import org.quartz.jobexecutionexception;
/**
 * 

* this is just a simple job that says "hello" to the world. *

* * @author bill kratzer */ public class hellojob implements job { private static logger _log = loggerfactory.getlogger(hellojob.class); /** *

* empty constructor for job initilization *

*

* quartz requires a public empty constructor so that the * scheduler can instantiate the class whenever it needs. *

*/ public hellojob() { } /** *

* called by the {@link org.quartz.scheduler} when a * {@link org.quartz.trigger} fires that is associated with * the job. *

* * @throws jobexecutionexception * if there is an exception while executing the job. */ public void execute(jobexecutioncontext context) throws jobexecutionexception { // say hello to the world and display the date/time _log.info("hello world! - " new date()); } }

另一个simpleexample:

 package org.quartz.examples.example1;
 import org.quartz.jobdetail;
 import org.quartz.scheduler;
 import org.quartz.schedulerfactory;
 import org.quartz.trigger;
 import org.quartz.impl.stdschedulerfactory;
 import org.slf4j.logger;
 import org.slf4j.loggerfactory;
 import java.util.date;
 import static org.quartz.datebuilder.evenminutedate;
 import static org.quartz.jobbuilder.newjob;
 import static org.quartz.triggerbuilder.newtrigger;
 /**
  * this example will demonstrate how to start and shutdown the quartz scheduler and how to schedule a job to run in
  * quartz.
  *
  * @author bill kratzer
  */
 public class simpleexample {
     public void run() throws exception {
         logger log = loggerfactory.getlogger(simpleexample.class);
         log.info("------- initializing ----------------------");
         // 1. 创建一个scheduler
         schedulerfactory sf = new stdschedulerfactory();
         scheduler sched = sf.getscheduler();
         log.info("------- initialization complete -----------");
         // computer a time that is on the next round minute
         date runtime = evenminutedate(new date());
         log.info("------- scheduling job  -------------------");
         // 2. 指定一个job
         jobdetail job = newjob(hellojob.class).withidentity("job1", "group1").build();
         // 3. 指定一个trigger
         trigger trigger = newtrigger().withidentity("trigger1", "group1").startat(runtime).build();
         // 4. 绑定job和trigger
         sched.schedulejob(job, trigger);
         log.info(job.getkey()   " will run at: "   runtime);
         // 5. 执行
         sched.start();
         log.info("------- started scheduler -----------------");
         // wait long enough so that the scheduler as an opportunity to
         // run the job!
         log.info("------- waiting 65 seconds... -------------");
         try {
             // wait 65 seconds to show job
             thread.sleep(65l * 1000l);
             // executing...
         } catch (exception e) {
             //
         }
         // shut down the scheduler
         log.info("------- shutting down ---------------------");
         sched.shutdown(true);
         log.info("------- shutdown complete -----------------");
     }
     public static void main(string[] args) throws exception {
         simpleexample example = new simpleexample();
         example.run();
     }
 }

整个simpleexample只有五个步骤:

  • 创建scheduler,这是一个调度器,例子中使用调度器工厂来创建一个调度器;
  • 创建一个job。实际上job就是那个hellojob,但是这里把hellojob丢给了jobdetail对象,job接口本身只有一个execute函数,没有其他的属性了,如果需要附加其他属性,jobdetail就支持,比如我们需要往job中传递参数,jobdetail中提供了一个jobdatamap。当job在运行的时候,execute函数里面的就能获取到jobdetail对象,并将设置的数据传递给job接口的实现;
  • 创建一个trigger。trigger对象主责是任务的执行时间,比如官方例子中的startat函数,就指定了具体的运行时间,还有startnow(立即执行);
  • 用scheduler绑定job和trigger;
  • 执行scheduler。

quartz的使用是不是简单又清晰?job是任务,单一职责,不做任何其他事情。trigger负责执行的频率等等属性。scheduler负责按照trigger的规则去执行job的内容。各自部分的功能符合单一原则。

但是,到这里都不是分布式的方式,依然是单体架构的。那么,quartz如何做到分布式的呢?

quartz如何分布式?

quartz的分布式实现方式并不依赖其他分布式协调管理中间件完成,而是使用数据锁来实现。使用数据做协调管理中间件的唯一的前提是:需要把集群的每台机器时间校对一致。

quartz数据库核心表如下:

表名功能描述
qrtz_calendars存储quartz的calendar信息
qrtz_cron_triggers存储crontrigger,包括cron表达式和时区信息
qrtz_fired_triggers存储与已触发的trigger相关的状态信息,以及相联job的执行信息
qrtz_paused_trigger_grps存储已暂停的trigger组的信息
qrtz_scheduler_state存储少量的有关scheduler的状态信息,和别的scheduler实例
qrtz_locks存储程序的悲观锁的信息
qrtz_job_details存储每一个已配置的job的详细信息
qrtz_job_listeners存储有关已配置的joblistener的信息
qrtz_simple_triggers存储简单的trigger,包括重复次数、间隔、以及已触的次数
qrtz_blog_triggerstrigger作为blob类型存储
qrtz_trigger_listeners存储已配置的triggerlistener的信息
qrtz_triggers存储已配置的trigger的信息

字体加粗的qrtz_locks表是一个悲观锁的存储实现,quartz认为每条记录都可能会产生并发冲突。以上表的sql可以在quartz目录中找到:

找到自己喜欢的数据库品牌,并创建好表即可。

跟着官方例子看源码

我们从hello的execute方法开始,反着找,继续看看分布式的方式如何实现。为什么反着找呢?因为这里是我们业务实现的主体内容,quartz框架最终必须要调用到这个execute的具体实现的。我们找到调用execute的地方有很多处:

从类名来分析,directoryscanjob看着是目录扫描任务,filescanjob直译是文件扫描任务,sendmailjob是发送邮件任务,最后只剩那个jobrunshell,毕竟翻译过来叫“任务运行の核心”啊。进入jobrunshell,找到调用execute函数的部分,execute函数被包裹在一个一百三十多行长又长的run函数中:

public void run() {
    qs.addinternalschedulerlistener(this);
    try {
        // ...省略很多源代码
        do {
            // ...省略很多源代码
            try {
                begin();
            } catch (schedulerexception se) {
                // ... 省略源代码
            }
            // ... 省略源代码
            try {
                log.debug("calling execute on job "   jobdetail.getkey());
                // 这里负责执行job的execute函数
                job.execute(jec);
                endtime = system.currenttimemillis();
            } catch (jobexecutionexception jee) {
                // ... 省略源代码
            } catch (throwable e) {
                // ... 省略源代码
            }
            // ...省略很多源代码
            try {
                complete(true);
            } catch (schedulerexception se) {
                // ... 省略源代码
            }
            // ...省略很多源代码
        } while (true);
    } finally {
        qs.removeinternalschedulerlistener(this);
    }
}

可以看到run中间的execute被夹在一个begin函数和comlete函数中,而begin和complete的实现是一个基于jta事务的jtajobrunshell的实现来完成的。jobrunshell是一个runnable接口的实现,那么刚刚的run方法,必定在某处启用了线程(池)的start方法。

mars酱继续跟踪查找源代码,在quartzschedulerthread中的run函数中,找到jobrunshell的调用部分:

@override
public void run() {
    int acquiresfailed = 0;
    while (!halted.get()) {
        // ...省略很多源代码
        // 源代码279行
        int availthreadcount = qsrsrcs.getthreadpool().blockforavailablethreads();
        // ...省略很多源代码
        if(availthreadcount > 0) { 
            // ...省略很多源代码
            // 取下一个trigger,周期是30秒取一次
            triggers = qsrsrcs.getjobstore().acquirenexttriggers(
                now   idlewaittime, math.min(availthreadcount, qsrsrcs.getmaxbatchsize()), qsrsrcs.getbatchtimewindow());
            // ...省略很多源代码
            // 触发trigger
            list res = qsrsrcs.getjobstore().triggersfired(triggers);
            // ...省略很多源代码
            // 释放trigger,当bndle的结果是null就释放trigger
            if (bndle == null) {
                qsrsrcs.getjobstore().releaseacquiredtrigger(triggers.get(i));
                continue;
            }
            // ...省略很多源代码
            jobrunshell shell = null;
            try {
                shell = qsrsrcs.getjobrunshellfactory().createjobrunshell(bndle);
                shell.initialize(qs);
            } catch (schedulerexception se) {
                qsrsrcs.getjobstore().triggeredjobcomplete(triggers.get(i), bndle.getjobdetail(), completedexecutioninstruction.set_all_job_triggers_error);
                continue;
            }
            // 这里调用jobrunshell
            if (qsrsrcs.getthreadpool().runinthread(shell) == false) {
                // ...省略很多源代码
            }
        }  
    }
}

quartzschedulerthread的run函数就是核心处理流程了,qsrsrcs.getthreadpool().runinthread(shell)内部就根据具体的simplethreadpool或者zerosizethreadpool来执行run函数,while循环基本就是不停的在轮询不断的去拿trigger,然后判断trigger的时间是不是到了,再按照时间trigger的时间规则执行job,最后再标记为完成、释放trigger。

trigger的处理

上面的逻辑都是通过qsrsrcs.getjobstore()得到的对象去处理trigger的,返回对象是jobstore。任意查看qsrsrcs.getjobstore()调用的函数,比如:releaseacquiredtriggerjobstore,它的实现有两个是比较重要的:一个是ramjobstore,一个是jobstoresupport。前者是ram作为存储介质,作者还特意写上了这样一段注释:

this class implements a jobstore that utilizes ram as its storage device.

as you should know, the ramification of this is that access is extrememly fast, but the data is completely volatile - therefore this jobstore should not be used if true persistence between program shutdowns is required.

这段英文的央视翻译:

这个类实现了一个使用ram作为存储设备的jobstore。

您应该知道,这样做的后果是访问速度非常快,但是数据是完全不稳定的——因此,如果需要在程序关闭之间实现真正的持久性,则不应该使用这个jobstore。

而且内存存储也无法分布式处理吧?所以,mars酱选择了观看jobstoresupport:

从import可以知道,这个玩意是连接了数据库的,所以呢,acquirenexttriggers、triggersfired、releaseacquiredtrigger这些方法负责具体trigger的相关操作,都最终会执行到jobstoresupport的第3844行的executeinnonmanagedtxlock函数:

    /**
     * execute the given callback having optionally acquired the given lock.
     * this uses the non-managed transaction connection.
     * 
     * @param lockname the name of the lock to acquire, for example
     * "trigger_access".  if null, then no lock is acquired, but the
     * lockcallback is still executed in a non-managed transaction. 
     */
    protected  t executeinnonmanagedtxlock(
            string lockname, 
            transactioncallback txcallback, final transactionvalidator txvalidator) throws jobpersistenceexception {
        boolean transowner = false;
        connection conn = null;
        try {
            if (lockname != null) {
                // if we aren't using db locks, then delay getting db connection 
                // until after acquiring the lock since it isn't needed.
                if (getlockhandler().requiresconnection()) {
                    conn = getnonmanagedtxconnection();
                }
                transowner = getlockhandler().obtainlock(conn, lockname);
            }
            if (conn == null) {
                conn = getnonmanagedtxconnection();
            }
            final t result = txcallback.execute(conn);
            try {
                commitconnection(conn);
            } catch (jobpersistenceexception e) {
                rollbackconnection(conn);
                if (txvalidator == null || !retryexecuteinnonmanagedtxlock(lockname, new transactioncallback() {
                    @override
                    public boolean execute(connection conn) throws jobpersistenceexception {
                        return txvalidator.validate(conn, result);
                    }
                })) {
                    throw e;
                }
            }
            long sigtime = clearandgetsignalschedulingchangeontxcompletion();
            if(sigtime != null && sigtime >= 0) {
                signalschedulingchangeimmediately(sigtime);
            }
            return result;
        } catch (jobpersistenceexception e) {
            rollbackconnection(conn);
            throw e;
        } catch (runtimeexception e) {
            rollbackconnection(conn);
            throw new jobpersistenceexception("unexpected runtime exception: "
                      e.getmessage(), e);
        } finally {
            try {
                releaselock(lockname, transowner);
            } finally {
                cleanupconnection(conn);
            }
        }
    }

整体的过程简要说明就是:获取数据库连接,给需要执行的trigger加锁,处理完之后再释放锁。

结合起来

结合前面的流程来看,一个调度器在执行前如果涉及到分布式的情况,流程如下:

  • 首先要获取quartz_locks表中对应的锁(在executeinnonmanagedtxlock函数的getlockhandler().obtainlock(conn, lockname)中);
  • 获取锁后执行quartzschedulerthread中的jobrunshell,完成任务的执行;
  • 最后quartzschedulerthread中调用triggeredjobcomplete函数,锁被释放,在executeinnonmanagedtxlock函数的releaselock(lockname, transowner)中执行;

集群中的每一个调度器实例都遵循这样的操作流程。

总结

quartz 是一款用于分布式系统的高性能调度框架,它采用了数据库作为分布式锁机制来保证同一时刻只有一个 scheduler 实例访问数据库中的 trigger。

在 quartz 中,调度器线程会争抢访问数据库中的 trigger,以确保在同一时刻只有一个调度器线程执行某个 trigger 的操作。如果有多个调度器线程同时尝试访问同一个 trigger,它们会相互等待对方释放锁。当一个调度器线程获得了锁,它就可以访问数据库并执行相应的操作。

另外,quartz 还采用了悲观锁的策略来避免死锁的发生。当一个调度器线程尝试获取锁时,如果锁已经被其他线程占用,那么这个线程会等待,直到有线程释放了锁。如果在等待过程中没有其他线程释放锁,那么这个线程就会一直等待下去,直到调度器重新分配了锁。

总之,quartz 的分布式调度原理是通过数据库锁和悲观锁来实现的,以保证同一时刻只有一个调度器线程访问数据库中的 trigger,从而提高系统的性能和可靠性。

以上就是一分钟掌握java quartz定时任务的详细内容,更多关于java quartz定时任务的资料请关注其它相关文章!

免费资源网 - https://freexyz.cn/
返回顶部
顶部
网站地图