Xxl-Job scheduler principle analysis

Project analysis source code address:    https://gitee.com/lidishan/xxl-job-code-analysis      xxl-job version: 2.3.0     Xxl-Job is divided into executor and scheduler. Our usual client belongs to an executor. When the executor starts, it will automatically register with the scheduler, and then the scheduler will perform remote scheduling.

The steps of the scheduler initialization process are as follows        1 Internationalization related        Configuration parameters: xxl.job.i18n=zh_CN, here is set to Chinese simplified        2 Initialize fast thread fastTriggerPool, slow thread pool slowTriggerPool        Configuration parameter: xxl.job.triggerpool.fast.max=200, here is set as the maximum number of threads of fastTriggerPool=200, not less than 200               xxl.job.triggerpool.slow.max=100, here is set to the maximum number of threads of slowTriggerPool=100, not less than 100        3 Start the registration listener thread        3.1 initialize the thread pool registryOrRemoveThreadPool: for registered or removed when the thread pool, the client calls api / registry or api / registryRemove interfaces, will be registered with the thread pool or cancellation      3.2 Start the thread that monitors the registration registryMonitorThread: clear the registration information whose heartbeat exceeds 90s , and refresh the group registration information        4 Start the failed task monitoring thread (retry, alarm)        Configuration parameters: [email protected], alert email        5 Start the monitoring thread        5.1 Initialize the callbackThreadPool thread pool: the thread pool used for callback callbacks , the client will use this thread pool when calling the api/callback interface      5.2 Monitoring start line monitorThread: scheduling records remain in the "operational" state of more than 10 min , and the corresponding actuator heartbeat registration failure is not online , it will mark the local scheduler initiative fails        6 Start log statistics and clear thread logrThread        - Logging refresh, refresh the last three days of log Report (ie statistics daily failure, success, number of runs, etc.)      - Clear once a day failed expired log data      Configuration parameter: xxl.job.logretentiondays=30, clear the expiration time of the xxl-job database log, if it is less than 7 days, it will not be cleared          7 Start task scheduling (very important! Mainly rely on these two threads to stuff data into the time round, and then time round the number of scheduling tasks)        7.1 scheduleThread thread-take the task data to be executed into the time wheel (plug data)        - The first step: Use select for update  database as a distributed lock to lock , avoid multiple xxl-job admin scheduler nodes to execute at the same time        - Step 2: Pre-read data , read the job information that will be executed within five seconds from the database, and read the page size as preReadCount=6000 pieces of data        ---- preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;        - The third step: Compare the current time with the next scheduled time , there are three situations as follows        **** current time is greater than (the next time the task trigger time + PRE_READ_MS (5s)): the query may be too long, then the following code to refresh the next task execution time, resulting in more than five seconds, so we need special treatment        -------- 1. Match the strategy of expiration and invalidation: DO_NOTHING=Do not do anything after expiration, discard; FIRE_ONCE_NOW=trigger once after expiration        -------- 2. Refresh the last trigger and the next pending time        **** current time is greater than the next task trigger time and time is not expired in:        -------- 1. Trigger the task executor directly        -------- 2. Refresh the last trigger and the next pending time        -------- 3. If the next trigger is within five seconds, put it directly into the time wheel to be scheduled        ---------------- 1. Find the N second of the next minute when the current task is triggered        ---------------- 2. Put the current task ID and ringSecond into the time wheel        ---------------- 3. Refresh the last trigger and the next pending time        **** current time is less than one trigger time at:        -------- 1. Find the Nth second of the minute where the next trigger time of the current task is located        -------- 2. Put the current task ID and ringSecond into the time wheel        -------- 3. Refresh the last trigger and the next pending time        - Step 4: Update the database executor information, such as trigger_last_time, trigger_next_time        - Step 5: Submit the database transaction and release the database select for update exclusive lock          7.2 ringThread thread - execute job task according to the time round (take data to execute)        First, the time wheel data format is: Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>()        - The first step: get the first few seconds of the current minute, and then for twice, the second time is to rerun the job list that has not been executed in the previous scale to avoid the omission of the previous scale        - Step 2: Execute the trigger        - Step 3: Clear the data in the current scale list        **** The corresponding strategy will be selected during the execution process, as follows:        --------  Blocking strategy: serial, discard the back, cover the front        -------- Routing strategy: take the first one, take the last one, minimum distribution, consistent hash, fast failure, LFU least frequently used, LRU least recently used, random, polling          The initial entry code is XxlJobAdminConfig , the code is as follows:     @Component      public class  XxlJobAdminConfig  implements  InitializingBean, DisposableBean  {          private static  XxlJobAdminConfig  adminConfig =  null;          public static  XxlJobAdminConfig  getAdminConfig    () {              return  adminConfig   ;          }          // ---------------------- XxlJobScheduler ----------------------          private  XxlJobScheduler  xxlJobScheduler    ;          @Override          public void  afterPropertiesSet    ()  throws  Exception {    // Property injection in the life cycle to initialize xxlJobScheduler              adminConfig =  this;                    //  Initialize    xxl-job    timing task              xxlJobScheduler  =  new  XxlJobScheduler()    ;              xxlJobScheduler.init    ()    ;          }          @Override          public void  destroy    ()  throws  Exception {  // Destroy in the life cycle to destroy xxlJobScheduler              xxlJobScheduler    .destroy()    ;          }          ..............Omit..............      }          xxlJobScheduler.init() will perform the following process for initialization:      public class  XxlJobScheduler {          private static final  Logger  logger = LoggerFactory.    getLogger   (XxlJobScheduler.    class    )    ;          public void  init    ()  throws  Exception {                    // 1  Internationalization related  init i18n                    initI18n()    ;                    // 2  initialize fast thread pool    fastTriggerPool    , slow thread pool    slowTriggerPool admin trigger pool start                    JobTriggerPoolHelper.    toStart   ()    ;                    // 3  Start the registration monitor thread  admin registry monitor run                    JobRegistryHelper.    getInstance   ().start()    ;                    // 4  Start the failed task monitor thread    (    retry, alarm    ) admin fail-monitor run                    JobFailMonitorHelper.    getInstance   ().start()    ;                    // 5  Start the monitoring thread (the scheduling record stays in the  "    running    "  state for more than    10 minutes    , and the corresponding actuator heartbeat registration fails and is not online, the local scheduling is actively marked as a failure)    admin lose-monitor run (depend on JobTriggerPoolHelper)                    JobCompleteHelper.    getInstance   ().start()    ;                    // 6  Start the log statistics and clearing thread (log record refresh, refresh the log    Report of the    last three days (that is, count the daily failures, successes, running times, etc.); clear the invalid and expired log data once a day)    admin log report start                    JobLogReportHelper.    getInstance   ().start()    ;                    // 7  Start task scheduling    (scheduleThread-    fetch task data to be executed into the time wheel;    ringThread-    execute    job    task    according to the time wheel ) start-schedule (depend on JobTriggerPoolHelper)                    JobScheduleHelper.    getInstance   ().start()    ;                    logger   .info(    ">>>>>>>>> init xxl-job admin success."    )    ;             }          ......Omitted........................      }


The 7 steps of the above initialization are split as follows==============      1 Internationalization related      private void  initI18n    (){    // Set the title to Chinese, English, etc. according to the environment          for  (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.    values   ()) {              item.setTitle(I18nUtil.    getString   (    "jobconf_block_"    .concat(item.name())))    ;          }      }        2 Initialize fast thread fastTriggerPool, slow thread pool slowTriggerPool      This step initializes two thread pools fastTriggerPool and slowTriggerPool      When the scheduling is triggered, there will be a process of selecting the fast or slow thread pool. If the job exceeds 10 times in a minute, it will be processed by slowTriggerPool, as follows:     ThreadPoolExecutor  triggerPool_  =  fastTriggerPool    ;      AtomicInteger jobTimeoutCount =  jobTimeoutCountMap    .get(jobId)    ;      if  (jobTimeoutCount!=    null  && jobTimeoutCount.get()>  10    ) {  // If the job    exceeds    10    times    in one minute , use slowTriggerPool    to process  job-timeout 10 times in 1 min             triggerPool_ =  slowTriggerPool    ;      }      triggerPool_.execute(    new  Runnable() {.........omit............}        3 Start the registration listener thread      3.1 initialize the thread pool registryOrRemoveThreadPool: for registered or removed when the thread pool, the client calls api / registry or api / registryRemove interfaces, will be registered with the thread pool or cancellation     3.2 Start the thread that monitors the registration registryMonitorThread: clear the registration information whose heartbeat exceeds 90s , and refresh the group registration information       public void start(){   // Thread pool used for registration or removal. When the client calls the api/registry or api/registryRemove interface, it will use this thread pool to register or unregister for registry or remove     registryOrRemoveThreadPool = new ThreadPoolExecutor(               2,               10,               30L,               TimeUnit.SECONDS,               new LinkedBlockingQueue<Runnable>( 2000 ),               new ThreadFactory() {                  @Override                  public Thread newThread(Runnable r) {                     return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());                  }               },               new RejectedExecutionHandler() {                  @Override                  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                     r.run();                     logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");                  }               });     // Start the thread for monitor registration for monitor     registryMonitorThread = new Thread(new Runnable() {            @Override            public void run() {               while (!toStop) {                  try {   // Get the automatically registered actuator group (actuator address type: 0=automatic registration, 1=manual entry) auto registry group                       List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);                     if (groupList!=null && !groupList.isEmpty()) {// group group collection is not empty     // Remove the dead call address (the heartbeat time exceeds 90 seconds, when the thread hangs. The default is 30s to do a heartbeat) remove dead address (admin/executor)                          List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());                        if (ids!=null && ids.size()>0) { // Remove the registered address information that hangs                             XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);                        }                          // fresh online address (admin/executor)                        HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();   // Find out all registered addresses that are normal and not dead                          List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());                        if (list != null) {                           for (XxlJobRegistry item: list) {   // Make sure it is EXECUTOR actuator type                                if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {                                 String appname = item.getRegistryKey();                                 List<String> registryList = appAddressMap.get(appname);                                 if (registryList == null) {                                    registryList = new ArrayList<String>();                                 }                                     if (!registryList.contains(item.getRegistryValue())) {                                    registryList.add(item.getRegistryValue());                                 }                                 appAddressMap.put(appname, registryList);                              }                           }                        }       // Refresh group registration address information fresh group address                          for (XxlJobGroup group: groupList) {                           List<String> registryList = appAddressMap.get(group.getAppname());                           String addressListStr = null;                           if (registryList!=null && !registryList.isEmpty()) {                              Collections.sort(registryList);                              StringBuilder addressListSB = new StringBuilder();                              for (String item:registryList) {                                 addressListSB.append(item).append(",");                              }                              addressListStr = addressListSB.toString();                              addressListStr = addressListStr.substring(0, addressListStr.length()-1);                           }                           group.setAddressList(addressListStr);                           group.setUpdateTime(new Date());                               XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);                        }                     }                  } catch (Exception e) {                     if (!toStop) {                        logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);                     }                  }                  try {                     TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);                  } catch (InterruptedException e) {                     if (!toStop) {                        logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);                     }                  }               }               logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");            }         });         registryMonitorThread.setDaemon(true);         registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");         registryMonitorThread.start();      }        4 Start the failed task monitoring thread (retry, alarm)      This part of the logic is relatively simple, that is, retry + warning, the core code is as follows       // Get job information that failed to execute        List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);      if (failLogIds!=null && !failLogIds.isEmpty()) {         for (long failLogId: failLogIds) {            // lock log            int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);            if (lockRet <1) {               continue;            }            XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);            XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());      // 1, fail retry monitor fail retry monitor              if (log.getExecutorFailRetryCount()> 0) {               JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);               String retryMsg = "<span style=\"color:#F39C12;\"> >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<< << </span>";               log.setTriggerMsg(log.getTriggerMsg() + retryMsg);               XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);            }     // 2. Perform failure alarm fail alarm monitor              int newAlarmStatus = 0; // Alarm status: 0-default, -1=locked status, 1-no alarm required, 2-alarm successful, 3-alarm failed              if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {               boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);               newAlarmStatus = alarmResult?2:3;            } else {               newAlarmStatus = 1;            }            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);         }      }        5 Start the monitoring thread      5.1 Initialize the callbackThreadPool thread pool: the thread pool used for callback callbacks , the client will use this thread pool when calling the api/callback interface     5.2 Monitoring start line monitorThread: scheduling records remain in the "operational" state of more than 10 min , and the corresponding actuator heartbeat registration failure is not online , it will mark the local scheduler initiative fails       The logic is simple, as the above two points          6 Start log statistics and clear thread logrThread      - Logging refresh, refresh the last three days of log Report (ie statistics daily failure, success, number of runs, etc.)     - Clear once a day failed expired log data     Configuration parameter: xxl.job.logretentiondays=30, clear the expiration time of the xxl-job database log, if it is less than 7 days, it will not be cleared       The logic is simple, as the above two points       7 Start task scheduling      7.1 scheduleThread- fetch the task data to be executed into the time wheel     - The first step: Use select for update database as a distributed lock to lock, avoid multiple xxl-job admin scheduler nodes to execute at the same time     - Step 2: Pre-read data, read the job information that will be executed within five seconds from the database, and read the page size as preReadCount=6000 pieces of data     ---- preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;     - The third step: Compare the current time with the next scheduled time, there are three situations as follows     **** The current time is greater than (the next trigger time of the task + PRE_READ_MS (5s)): It may be that the query is too long, and then the following code refreshes the task's next execution time, resulting in more than five seconds, so special processing is required     -------- 1. Match the strategy of expiration and invalidation: DO_NOTHING=Do not do anything after expiration, discard; FIRE_ONCE_NOW=trigger once after expiration     -------- 2. Refresh the last trigger and the next pending time     **** The current time is greater than the next trigger time of the task and is not expired:     -------- 1. Trigger the task executor directly     -------- 2. Refresh the last trigger and the next pending time     -------- 3. If the next trigger is within five seconds, put it directly into the time wheel to be scheduled     ---------------- 1. Find the N second of the next minute when the current task is triggered     ---------------- 2. Put the current task ID and ringSecond into the time wheel     ---------------- 3. Refresh the last trigger and the next pending time     **** The current time is less than the next trigger time:     -------- 1. Find the Nth second of the minute where the next trigger time of the current task is located     -------- 2. Put the current task ID and ringSecond into the time wheel     -------- 3. Refresh the last trigger and the next pending time     - Step 4: Update the database executor information, such as trigger_last_time, trigger_next_time     - Step 5: Submit the database transaction and release the database select for update exclusive lock       7.2 ringThread- execute job tasks according to the time wheel     First, the time wheel data format is: Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>()     - The first step: get the first few seconds of the current minute, and then for twice, the second time is to rerun the job list that has not been executed in the previous scale to avoid the omission of the previous scale     - Step 2: Execute the trigger     - Step 3: Clear the data in the current scale list     **** The corresponding strategy will be selected during the execution process, as follows:     -------- Blocking strategy: serial, discard the back, cover the front     -------- Routing strategy: take the first one, take the last one, minimum distribution, consistent hash, fast failure, LFU least frequently used, LRU least recently used, random, polling     The core source code for starting two thread analysis is as follows:    public void  start   (){            //  Start the scheduling thread, these threads are the  schedule thread used to fetch data        scheduleThread  =  new  Thread(   new  Runnable() {        @Override        public void  run   () {        try  {   //I  don’t know why I need to sleep for  4-5   seconds, and then start again            TimeUnit.   MILLISECONDS  .sleep(   5000  -System.   currentTimeMillis  ()%   1000  )   ;        }  catch  (InterruptedException e) {            if  (!   scheduleThreadToStop   ) {                logger  .error(e.getMessage()   ,  e)   ;            }        }        logger  .info(   ">>>>>>>>> init xxl-job admin scheduler success."   )   ;            //  Here is the  pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)        int  preReadCount = (XxlJobAdminConfig.   getAdminConfig  ().getTriggerPoolFastMax() + XxlJobAdminConfig.   getAdminConfig  ().getTriggerPoolSlowMax()) *  20   ;          while  (!   scheduleThreadToStop   ) {        //  scan task  Scan Job        long  start = System.   currentTimeMillis  ()   ;        Connection conn =  null;        Boolean connAutoCommit =  null;        PreparedStatement preparedStatement =  null        boolean  preReadSuc =  true;        try  {            conn = XxlJobAdminConfig.   getAdminConfig  ().getDataSource().getConnection()   ;                connAutoCommit = conn.getAutoCommit()   ;                conn.setAutoCommit(   false   )   ;                //  Using   select for update   is an exclusive lock. To put it   bluntly, xxl-job   uses a database table as a distributed lock to ensure that under multiple   xxl-job admin   nodes, only one scheduling thread task can still be executed at the same time            preparedStatement = conn.prepareStatement(  "select  *  from xxl_job_lock where lock_name ='schedule_lock' for update"  )   ;                preparedStatement.execute()   ;                  // tx start                  // 1   , pre read data  pre read                long  nowTime = System.   currentTimeMillis  ()   ;                // -  read off is not performed after five seconds from the database to   Job   , and reads   preReadCount = 6000   Article                List<XxlJobInfo> scheduleList = XxlJobAdminConfig.   getAdminConfig  ().getXxlJobInfoDao().scheduleJobQuery(nowTime +  PRE_READ_MS  ,  preReadCount)   ;                if  (scheduleList!=   null  && scheduleList.size()>   0   ) {                    // 2   ,   push   into the  time-ring push time-ring                    for  (XxlJobInfo jobInfo: scheduleList) {                          // time-ring jump                            if  (nowTime> jobInfo.getTriggerNextTime() +  PRE_READ_MS  ) {                                //The  current time is greater than (the next trigger time of the task  + PRE_READ_MS   (   5s   ))   , it   may be that the query is too long, and then the following code refreshes the task's next execution time, resulting in more than five seconds, so special processing is required                                // 2.1   、  trigger-expire> 5s   :  pass && make next-trigger-time                                logger  .warn(   "  >>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId())   ;                                // 1.   Match expired and invalid strategy:   DO_NOTHING=Do   nothing when expired, discard;   FIRE_ONCE_NOW=   trigger a  misfire match immediately after expiration                                MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.   match  (jobInfo.getMisfireStrategy()   ,  MisfireStrategyEnum.   DO_NOTHING  )   ;                            if  (MisfireStrategyEnum.   FIRE_ONCE_NOW == misfireStrategyEnum) {                                    // FIRE_ONCE_NOW  》 trigger                                        . JobTriggerPoolHelper   Trigger  (jobInfo.getId ()   ,  . TriggerTypeEnum   Misfire  ,  -   . 1   , null, null, null   )   ;                                        logger  .debug(   ">>>>>>>>>>> xxl-job, schedule push trigger: jobId = "  + jobInfo.getId() )   ;                                }                                // 2.   Refresh the last trigger and the next to-be-triggered time  fresh next                                  refreshNextValidTime(jobInfo   , new  Date())   ;                            }  else if  (nowTime> jobInfo.getTriggerNextTime()) {                                //The  current time is greater than the next trigger time of the task and is not expired                        // 2.2   ,   trigger-expire <5s   :   direct-trigger && make next-trigger-time                                // 1.   Directly trigger the task executor  trigger                                . JobTriggerPoolHelper   Trigger  (jobInfo.getId ()   ,  . TriggerTypeEnum   CRON  ,  -   . 1   , null, null, null   )   ;                                logger  .debug(   ">>>>>>>>>>> xxl-job, schedule push trigger: jobId = "  + jobInfo.getId() )   ;                                // 2.   Refresh the last trigger and the next to-be-triggered time  fresh next                                refreshNextValidTime(jobInfo   , new  Date())   ;                                  //  If the next trigger is within five seconds, put it directly into the time wheel to be scheduled  next-trigger-time in 5s, pre-read again                                if  (jobInfo.getTriggerStatus()==   1  && nowTime +  PRE_READ_MS > jobInfo.getTriggerNextTime()) {                                        // 1.   Find the   N   second of the  next trigger time of the current task make ring second                                        int  ringSecond = (   int   )((jobInfo.getTriggerNextTime()/   1000   )%   60   )   ;                                        // 2.   Put the current task   ID   and   ringSecond   into the time wheel  push time ring                                        pushTimeRing(ringSecond   ,  jobInfo.getId())   ;                                        // 3   , refresh the last trigger and the next pending time  fresh next                                        refreshNextValidTime(jobInfo   , new  Date(jobInfo.getTriggerNextTime()))   ;                                }                              }  else  {                            //The  current time is less than the next trigger time                                // 2.3   ,   trigger-pre-read   :   time-ring trigger && make next-trigger-time                                // 1.   Find the   N   second of the  next trigger time of the current task make ring second                                int  ringSecond = (   int   )((jobInfo.getTriggerNextTime()/   1000   )%   60   )   ;                                // 2.   Put the current task   ID   and   ringSecond   into the time wheel  push time ring                                pushTimeRing(ringSecond   ,  jobInfo.getId())   ;                                // 3   , refresh the last trigger and the next pending time  fresh next                                refreshNextValidTime(jobInfo   , new  Date(jobInfo.getTriggerNextTime()))   ;                            }                    }                      // 3.   Update the database executor information, such as   trigger_last_time   ,   trigger_next_time update trigger info                    for  (XxlJobInfo jobInfo: scheduleList) {                            XxlJobAdminConfig.   getAdminConfig  ().getXxlJobInfoDao().scheduleUpdate(jobInfo)   ;                    }                  }  else  {                preReadSuc =  false;                }                // tx stop        }  catch  (Exception e) {                if  (!   scheduleThreadToStop   ) {                    logger  .error(   ">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}"   ,  e)   ;                }        }  finally  {                //  Submit the transaction, release   the lock of the database select for update   commit             .......................Omitted.............             }        long  cost = System.   currentTimeMillis  ()-start   ;            //  If the execution is too fast, just   sleep and   wait for a while  Wait seconds, align second        if  (cost <  1000   ) {  // scan-overtime, not wait            try  {                // pre-read period: success> scan each second; fail> skip this period;                TimeUnit.   MILLISECONDS  .sleep((preReadSuc?   1000   :   PRE_READ_MS  )   -System . currentTimeMillis  ()%   1000   )   ;            }  catch  (InterruptedException e) {                if  (!   scheduleThreadToStop   ) {                    logger  .error(e.getMessage()   ,  e)   ;                }            }        })   ;        scheduleThread   .setDaemon(   true   )   ;        scheduleThread   .setName(   "xxl-job, admin JobScheduleHelper#scheduleThread"   )   ;        scheduleThread   .start()   ;              //  Time wheel thread, used to take out the data per second, and then process the  ring thread        ringThread  =  new  Thread(   new  Runnable() {            @Override            public void  run   () {                while  (!   ringThreadToStop   ) {                          // align second                          try  {                              TimeUnit.   MILLISECONDS  .sleep(   1000  -System.   currentTimeMillis  ()%  1000   )   ;                          }  catch  (InterruptedException e) {                        if  (!   ringThreadToStop   ) {                            logger  .error(e.getMessage()   ,  e)   ;                        }                    }                          try  {                              // second data                              List<Integer> ringItemData =  new  ArrayList<>()   ;                              //  Get the first few seconds of the current minute, and then   for   twice, the second time is to   rerun the job list   that was not executed in the previous scale, to avoid the omission of the previous scale                        int  nowSecond = Calendar.   getInstance  ().get(Calendar.   SECOND  )   ;  //  Avoid processing time-consuming too long, cross the scale, and check one scale forward;                        for  (   int  i =  0   ;  i <  2   ;  i++) {                            List<Integer> tmpData =  ringData  .remove( (nowSecond+   60   -i)%   60  )   ;                            if  (tmpData !=  null   ) {                                ringItemData.addAll(tmpData)   ;                            }                              }                                // ring trigger                              logger  .debug(   ">>>>>>>>>>> xxl-job, time-ring beat: "  + nowSecond +  " = "  + Arrays.   asList  (ringItemData) )   ;                              if  (ringItemData.size()>  0   ) {                                  // do trigger                                        for  (   int  jobId: ringItemData) {                                            //  Execute trigger  do trigger                                            . JobTriggerPoolHelper   Trigger  (the jobId   ,  TriggerTypeEnum.   CRON  ,  -   . 1   , null, null, null   )   ;                                        }                                        //  Clear the data of the current scale list  clear                                        ringItemData.clear()   ;                              }                          }  catch  (Exception e) {                                  if  (!   ringThreadToStop   ) {                                        logger  .error(   ">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}"   ,  e)   ;                              }                          }                      }                logger  .info(   ">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"   )   ;            }        })   ;        ringThread   .setDaemon(   true   )   ;        ringThread   .setName(   "xxl-job, admin JobScheduleHelper#ringThread"   )   ;        ringThread .start   ()   ;    }