创意电子

标题: Spring定时任务Quartz执行全过程源码解读 [打印本页]

作者: 指尖代码    时间: 2021-9-21 00:00
标题: Spring定时任务Quartz执行全过程源码解读
一、前言介绍

在日常开辟中常常会用到定时任务,用来;库表扫描发送MQ、T+n账单结算、缓存数据更新、秒杀活动状态变更,等等。由于有了Spring的Schedule极大的方便了我们对这类场景的使用。那么,除了应用你还了解它多少呢;
蒙圈了吧,是不感觉平时只是使用了,根本没关注过这些。有种冲动赶紧搜索答案吧!但只是知道答案是没有多少意义的,扛不住问不说,也不了解原理。所以,假如你想真的提升自己技能,还是要从根本搞定。
二、案例工程

为了更好的做源码分析,我们将平时用的定时任务服务单独抽离出来。工程下载,关注公众号:bugstack虫洞栈,回复:源码分析
itstack-demo-code-schedule└── src    ├── main    │   ├── java    │   │   └── org.itstack.demo    │   │       ├── DemoTask.java    │   │       └── JobImpl.java       │   └── resources            │       ├── props            │       │   └── config.properties    │       ├── spring    │       │   └── spring-config-schedule-task.xml    │       ├── logback.xml    │       └── spring-config.xml    └── test         └── java             └── org.itstack.demo.test                 ├── ApiTest.java                 ├── MyQuartz.java                                                 └── MyTask.java三、环境设置

四、源码分析

    org.quartz-scheduler    quartz    2.3.2依靠于Spring版本升级quartz选择2.3.2,同时假如你如本文案例中所示使用xml设置任务。那么会有如下更改;
Spring 3.x/org.springframework.scheduling.quart.CronTriggerBean
            
Spring 4.x/org.springframework.scheduling.quartz.CronTriggerFactoryBean
            在正式分析前,可以看下quartz的默认设置,很多初始化动作都要从这里取得参数,同样你可以设置自己的设置文件。例如,当你的任务很多时,默认初始化的10个线程组不满足你的业务需求,就可以按需调整。
quart.properties
# Default Properties file for use by StdSchedulerFactory# to create a Quartz Scheduler Instance, if a different# properties file is not explicitly specified.#org.quartz.scheduler.instanceName: DefaultQuartzSchedulerorg.quartz.scheduler.rmi.export: falseorg.quartz.scheduler.rmi.proxy: falseorg.quartz.scheduler.wrapJobExecutionInUserTransaction: falseorg.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPoolorg.quartz.threadPool.threadCount: 10org.quartz.threadPool.threadPriority: 5org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: trueorg.quartz.jobStore.misfireThreshold: 60000org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore1. 从一个简单案例开始

平时我们使用Schedule基本都是注解或者xml设置文件,但是为了可以更简单的分析代码,我们从一个简单的Demo入手,放到main函数中。
DemoTask.java & 定义一个等候被执行的任务
public class DemoTask {    private Logger logger = LoggerFactory.getLogger(DemoTask.class);    public void execute() throws Exception{        logger.info("定时处理用户信息任务:0/5 * * * * ?");    }}
MyTask.java & 测试类,将设置在xml中的代码抽离出来
public class MyTask {    public static void main(String[] args) throws Exception {        DemoTask demoTask = new DemoTask();        // 定义了;执行的内容        MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();        methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);        methodInvokingJobDetailFactoryBean.setTargetMethod("execute");        methodInvokingJobDetailFactoryBean.setConcurrent(true);        methodInvokingJobDetailFactoryBean.setName("demoTask");        methodInvokingJobDetailFactoryBean.afterPropertiesSet();        // 定义了;执行的筹划        CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();        cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());        cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");        cronTriggerFactoryBean.setName("demoTask");        cronTriggerFactoryBean.afterPropertiesSet();        // 实现了;执行的功能        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();        schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());        schedulerFactoryBean.setAutoStartup(true);        schedulerFactoryBean.afterPropertiesSet();        schedulerFactoryBean.start();        // 暂停住        System.in.read();    }}假如一切顺遂,那么会有如下效果:
2021-09-21 10:47:16.369 [main] INFO  org.quartz.impl.StdSchedulerFactory[1220] - Using default implementation for ThreadExecutor2021-09-21 10:47:16.421 [main] INFO  org.quartz.core.SchedulerSignalerImpl[61] - Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl2021-09-21 10:47:16.422 [main] INFO  org.quartz.core.QuartzScheduler[229] - Quartz Scheduler v.2.3.2 created.2021-09-21 10:47:16.423 [main] INFO  org.quartz.simpl.RAMJobStore[155] - RAMJobStore initialized.2021-09-21 10:47:16.424 [main] INFO  org.quartz.core.QuartzScheduler[294] - Scheduler meta-data: Quartz Scheduler (v2.3.2) 'QuartzScheduler' with instanceId 'NON_CLUSTERED'  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.  NOT STARTED.  Currently in standby mode.  Number of jobs executed: 0  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.2021-09-21 10:47:16.424 [main] INFO  org.quartz.impl.StdSchedulerFactory[1374] - Quartz scheduler 'QuartzScheduler' initialized from an externally provided properties instance.2021-09-21 10:47:16.424 [main] INFO  org.quartz.impl.StdSchedulerFactory[1378] - Quartz scheduler version: 2.3.22021-09-21 10:47:16.426 [main] INFO  org.quartz.core.QuartzScheduler[2293] - JobFactory set to: org.springframework.scheduling.quartz.AdaptableJobFactory@3e9b10102021-09-21 10:47:16.651 [main] INFO  org.quartz.core.QuartzScheduler[547] - Scheduler QuartzScheduler_$_NON_CLUSTERED started.九月 21, 2021 10:47:16 上午 org.springframework.scheduling.quartz.SchedulerFactoryBean startScheduler信息: Starting Quartz Scheduler now2021-09-21 10:47:20.321 [QuartzScheduler_Worker-1] INFO  org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?2021-09-21 10:47:25.001 [QuartzScheduler_Worker-2] INFO  org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?2021-09-21 10:47:30.000 [QuartzScheduler_Worker-3] INFO  org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?2021-09-21 10:47:35.001 [QuartzScheduler_Worker-4] INFO  org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?2021-09-21 10:47:40.000 [QuartzScheduler_Worker-5] INFO  org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?Process finished with exit code -12. 定义执行内容(MethodInvokingJobDetailFactoryBean)

// 定义了;执行的内容MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);methodInvokingJobDetailFactoryBean.setTargetMethod("execute");methodInvokingJobDetailFactoryBean.setConcurrent(true);methodInvokingJobDetailFactoryBean.setName("demoTask");methodInvokingJobDetailFactoryBean.afterPropertiesSet();这块内容主要将我们的任务体(即待执行任务DemoTask)交给MethodInvokingJobDetailFactoryBean管理,首先设置须要信息;
最后我们通过手动调用 afterPropertiesSet() 来模仿初始化。假如我们的类是交给 Spring 管理的,那么在实现了 InitializingBean 接口的类,在类设置信息加载后会自动执行 afterPropertiesSet() 。一般实现了 InitializingBean 接口的类,同时也会去实现 FactoryBean 接口,由于这个接口实现后就可以通过 T getObject() 获取自己自定义初始化的类。这也常常用在一些框架开辟中。
MethodInvokingJobDetailFactoryBean.afterPropertiesSet()
public void afterPropertiesSet() throws ClassNotFoundException, NoSuchMethodException {        prepare();        // Use specific name if given, else fall back to bean name.        String name = (this.name != null ? this.name : this.beanName);        // Consider the concurrent flag to choose between stateful and stateless job.        Class jobClass = (this.concurrent ? MethodInvokingJob.class : StatefulMethodInvokingJob.class);        // Build JobDetail instance.        JobDetailImpl jdi = new JobDetailImpl();        jdi.setName(name);        jdi.setGroup(this.group);        jdi.setJobClass((Class) jobClass);        jdi.setDurability(true);        jdi.getJobDataMap().put("methodInvoker", this);        this.jobDetail = jdi;                postProcessJobDetail(this.jobDetail);}
3. 定义执行筹划(CronTriggerFactoryBeann)

// 定义了;执行的筹划CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");cronTriggerFactoryBean.setName("demoTask");cronTriggerFactoryBean.afterPropertiesSet();这一块主要定义任务的执行筹划,并将任务执行内容交给 CronTriggerFactoryBean 管理,同时设置须要信息;
CronTriggerFactoryBean.afterPropertiesSet()
@Overridepublic void afterPropertiesSet() throws ParseException {            // ... 校验属性信息                CronTriggerImpl cti = new CronTriggerImpl();        cti.setName(this.name);        cti.setGroup(this.group);        if (this.jobDetail != null) {                cti.setJobKey(this.jobDetail.getKey());        }        cti.setJobDataMap(this.jobDataMap);        cti.setStartTime(this.startTime);        cti.setCronExpression(this.cronExpression);        cti.setTimeZone(this.timeZone);        cti.setCalendarName(this.calendarName);        cti.setPriority(this.priority);        cti.setMisfireInstruction(this.misfireInstruction);        cti.setDescription(this.description);        this.cronTrigger = cti;}
public void setCronExpression(String cronExpression) throws ParseException {      TimeZone origTz = getTimeZone();      this.cronEx = new CronExpression(cronExpression);      this.cronEx.setTimeZone(origTz);  }
CronExpression.java & 解析Cron表达式

protected void buildExpression(String expression) throws ParseException {      expressionParsed = true;      try {                                  // ... 初始化 TreeSet xxx = new TreeSet();                                  int exprOn = SECOND;          StringTokenizer exprsTok = new StringTokenizer(expression, " \t",                  false);                                                  while (exprsTok.hasMoreTokens() && exprOn instantiate()<li data-track="159">源码1323行: tp.initialize();</ul>
SimpleThreadPool.initialize() & 这里的count是默认设置中的数量,可以更改

// create the worker threads and start them   Iterator workerThreads = createWorkerThreads(count).iterator();   while(workerThreads.hasNext()) {       WorkerThread wt = workerThreads.next();       wt.start();       availWorkers.add(wt);   }5. 启动定时任务

案例中使用硬编码方式调用 schedulerFactoryBean.start() 启动线程服务。线程的协作通过Object sigLock来实现,关于sigLock.wait()方法都在QuartzSchedulerThread的run方法里面,所以sigLock唤醒的是只有线程QuartzSchedulerThread。焦点流程如下;

                               
登录/注册后可看大图

这个启动过程中,焦点的代码类,如下;
QuartzScheduler.start() & 启动
public void start() throws SchedulerException {    if (shuttingDown|| closed) {        throw new SchedulerException(                "The Scheduler cannot be restarted after shutdown() has been called.");    }            // QTZ-212 : calling new schedulerStarting() method on the listeners    // right after entering start()    notifySchedulerListenersStarting();            if (initialStart == null) {        initialStart = new Date();        this.resources.getJobStore().schedulerStarted();                    startPlugins();    } else {        resources.getJobStore().schedulerResumed();    }            // 唤醒线程        schedThread.togglePause(false);            getLog().info(            "Scheduler " + resources.getUniqueIdentifier() + " started.");        notifySchedulerListenersStarted();}
QuartzSchedulerThread.run() & 执行过程
@Overridepublic void run() {    int acquiresFailed = 0;                // 只有调用了halt()方法,才会退出这个死循环    while (!halted.get()) {        try {                                                // 一、假如是暂停状态,则循环超时等候1000毫秒            // wait a bit, if reading from job store is consistently failing (e.g. DB is down or restarting)..                               // 阻塞直到有空闲的线程可用并返回可用的数量            int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();            if(availThreadCount > 0) {                                        List triggers;                long now = System.currentTimeMillis();                clearSignaledSchedulingChange();                                                try {                                        // 二、获取acquire状态的Trigger列表,也就是即将执行的任务                    triggers = qsRsrcs.getJobStore().acquireNextTriggers(                            now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBat                    acquiresFailed = 0;                    if (log.isDebugEnabled())                        log.debug("batch acquisition of " + (triggers == null ? 0 : triggers                } catch(){//...}                                                if (triggers != null && !triggers.isEmpty()) {                                                            // 三:获取List第一个Trigger的下次触发时候                                        long triggerTime = triggers.get(0).getNextFireTime().getTime();                                                            // 四:获取任务触发聚集                                        List res = qsRsrcs.getJobStore().triggersFired(triggers);                                                                                // 五:设置Triggers为&#39;executing&#39;状态                                        qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));                                                            // 六:创建JobRunShell                                        qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);                                                                                // 七:执行Job                                        qsRsrcs.getThreadPool().runInThread(shell)                                                            continue; // while (!halted)                }            } else { // if(availThreadCount > 0)                // should never happen, if threadPool.blockForAvailableThreads() follows con                continue; // while (!halted)            }                                            } catch(RuntimeException re) {            getLog().error("Runtime error occurred in main trigger firing loop.", re);        }    }        qs = null;    qsRsrcs = null;}
JobRunShell shell = null;  try {      shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);      shell.initialize(qs);  } catch (SchedulerException se) {      qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);      continue;  }
  // 保存所有WorkerThread的聚集  private List workers;  // 空闲的WorkerThread聚集  private LinkedList availWorkers = new LinkedList();  // 任务的WorkerThread聚集  private LinkedList busyWorkers = new LinkedList();  /**   * 维护workers、availWorkers和busyWorkers三个列表数据   * 有任务需要一个线程出来执行:availWorkers.removeFirst();busyWorkers.add()   * 然后调用WorkThread.run(runnable)方法   */  public boolean runInThread(Runnable runnable) {      if (runnable == null) {          return false;      }      synchronized (nextRunnableLock) {          handoffPending = true;          // Wait until a worker thread is available          while ((availWorkers.size() < 1) && !isShutdown) {              try {                  nextRunnableLock.wait(500);              } catch (InterruptedException ignore) {              }          }          if (!isShutdown) {              WorkerThread wt = (WorkerThread)availWorkers.removeFirst();              busyWorkers.add(wt);              wt.run(runnable);          } else {              // If the thread pool is going down, execute the Runnable              // within a new additional worker thread (no thread from the pool).                                              WorkerThread wt = new WorkerThread(this, threadGroup,                      "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);              busyWorkers.add(wt);              workers.add(wt);              wt.start();          }          nextRunnableLock.notifyAll();          handoffPending = false;      }      return true;  }
public void run(Runnable newRunnable) {      synchronized(lock) {          if(runnable != null) {              throw new IllegalStateException("Already running a Runnable!");          }          runnable = newRunnable;          lock.notifyAll();      }  }
@Override  public void run() {      boolean ran = false;                      while (run.get()) {          try {              synchronized(lock) {                  while (runnable == null && run.get()) {                      lock.wait(500);                  }                  if (runnable != null) {                      ran = true;                      // 启动真正执行的内容,runnable就是JobRunShell                      runnable.run();                  }              }          } cache(){//...}      }      //if (log.isDebugEnabled())      try {          getLog().debug("WorkerThread is shut down.");      } catch(Exception e) {          // ignore to help with a tomcat glitch      }  }
JobRunShell.run() & 从上面WorkerThread.run(),调用到这里执行
public void run() {    qs.addInternalSchedulerListener(this);    try {        OperableTrigger trigger = (OperableTrigger) jec.getTrigger();        JobDetail jobDetail = jec.getJobDetail();        do {            // ...            long startTime = System.currentTimeMillis();            long endTime = startTime;            // execute the job            try {                log.debug("Calling execute on job " + jobDetail.getKey());                                                // 执行业务代码,也就是我们的task                                job.execute(jec);                                                endTime = System.currentTimeMillis();            } catch (JobExecutionException jee) {                endTime = System.currentTimeMillis();                jobExEx = jee;                getLog().info("Job " + jobDetail.getKey() +                        " threw a JobExecutionException: ", jobExEx);            } catch (Throwable e) {                endTime = System.currentTimeMillis();                getLog().error("Job " + jobDetail.getKey() +                        " threw an unhandled Exception: ", e);                SchedulerException se = new SchedulerException(                        "Job threw an unhandled exception.", e);                qs.notifySchedulerListenersError("Job ("                        + jec.getJobDetail().getKey()                        + " threw an exception.", se);                jobExEx = new JobExecutionException(se, false);            }            jec.setJobRunTime(endTime - startTime);            // 其他代码        } while (true);    } finally {        qs.removeInternalSchedulerListener(this);    }}
QuartzJobBean.execte() & 继续往下走
public final void execute(JobExecutionContext context) throws JobExecutionException {        try {                BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this);                MutablePropertyValues pvs = new MutablePropertyValues();                pvs.addPropertyValues(context.getScheduler().getContext());                pvs.addPropertyValues(context.getMergedJobDataMap());                bw.setPropertyValues(pvs, true);        }        catch (SchedulerException ex) {                throw new JobExecutionException(ex);        }        executeInternal(context);}
MethodInvokingJobDetailFactoryBean->MethodInvokingJob.executeInternal(JobExecutionContext context)
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {        try {                // 反射执行业务代码                context.setResult(this.methodInvoker.invoke());        }        catch (InvocationTargetException ex) {                if (ex.getTargetException() instanceof JobExecutionException) {                        // -> JobExecutionException, to be logged at info level by Quartz                        throw (JobExecutionException) ex.getTargetException();                }                else {                        // -> "unhandled exception", to be logged at error level by Quartz                        throw new JobMethodInvocationFailedException(this.methodInvoker, ex.getTargetException());                }        }        catch (Exception ex) {                // -> "unhandled exception", to be logged at error level by Quartz                throw new JobMethodInvocationFailedException(this.methodInvoker, ex);        }}五、综上总结

http://www.quartz-scheduler.org
quartz-2.1.x/configuration




欢迎光临 创意电子 (https://www.wxcydz.cc/) Powered by Discuz! X3.4