调度中心源码分享
XxlJobScheduler
public void init() throws Exception {// init i18ninitI18n();// admin trigger pool startJobTriggerPoolHelper.toStart();// admin registry monitor runJobRegistryHelper.getInstance().start();// admin fail-monitor runJobFailMonitorHelper.getInstance().start();// admin lose-monitor run ( depend on JobTriggerPoolHelper )JobCompleteHelper.getInstance().start();// admin log report startJobLogReportHelper.getInstance().start();// start-schedule ( depend on JobTriggerPoolHelper )JobScheduleHelper.getInstance().start();logger.info(">>>>>>>>> init xxl-job admin success.");}
JobTriggerPoolHelper: 快慢线程池
/*** add trigger*/public void addTrigger(final int jobId,final TriggerTypeEnum triggerType,final int failRetryCount,final String executorShardingParam,final String executorParam,final String addressList) {// choose thread poolThreadPoolExecutor triggerPool_ = fastTriggerPool;AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);if (jobTimeoutCount != null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 mintriggerPool_ = slowTriggerPool;}// triggertriggerPool_.execute(new Runnable() {@Overridepublic void run() {long start = System.currentTimeMillis();try {// do triggerXxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);} catch (Throwable e) {logger.error(e.getMessage(), e);} finally {// check timeout-count-maplong minTim_now = System.currentTimeMillis() / 60000;if (minTim != minTim_now) {minTim = minTim_now;jobTimeoutCountMap.clear();}// incr timeout-count-map// 计算执行时间 、然后分配快慢线程池的执行任务long cost = System.currentTimeMillis() - start;if (cost > 500) { // ob-timeout threshold 500msAtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));if (timeoutCount != null) {timeoutCount.incrementAndGet();}}}}@Overridepublic String toString() {return "Job Runnable, jobId:" + jobId;}});}
JobRegistryHelper : 执行器地址注册和移除、以及在在线地址探测和监控。
JobFailMonitorHelper: 获取失败的任务、根据重试次数判断是否需要执行,并通过邮件告警
JobCompleteHelper: 执行器任务完成后,回调调度中心的线程

JobLogReportHelper: 记录统计信息生成报表、并定时清理日志。
JobScheduleHelper
private Thread scheduleThread;private Thread ringThread;private volatile boolean scheduleThreadToStop = false;private volatile boolean ringThreadToStop = false;
// 时间轮数据private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();public void start() {// schedule threadscheduleThread = new Thread(new Runnable() {@Overridepublic void run() {try {TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000);} catch (Throwable e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>> init xxl-job admin scheduler success.");// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)// 触发器 每秒能处理的消息数量 20个 (1秒内 50毫秒) 计算预读数量int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {// Scan Joblong start = System.currentTimeMillis();Connection conn = null;Boolean connAutoCommit = null;PreparedStatement preparedStatement = null;boolean preReadSuc = true;try {conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit = conn.getAutoCommit();conn.setAutoCommit(false);// 数据库行锁 ,通过select for update的方式添加一个悲观锁,可以确保在同一时刻,只能有一个事务获取到锁。// 分布式环境下公用一个db的话、只有一个节点能获取到preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update");preparedStatement.execute();// tx start// 1、pre readlong nowTime = System.currentTimeMillis();// 每次循环预读取未来5秒内需要执行的任务(PRE_READ_MS = 5000)List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList != null && scheduleList.size() > 0) {// 2、push time-ringfor (XxlJobInfo jobInfo : scheduleList) {// time-ring jump// 过期超过5秒的任务( 对到达now时间后的任务(超出now在5秒外))if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {// 2.1、trigger-expire > 5s:pass && make next-trigger-timelogger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());// 1、misfire match 如果是这个策略则立即触发否则不做任何事情MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// FIRE_ONCE_NOW 》 triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId());}// 2、fresh next 计算下次执行时间refreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) { // 过期小于5秒的任务 (对到达now时间后的任务(超出now在 5秒内))// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time// 1、trigger 立即触发JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId());// 2、fresh next 计算下次执行时间refreshNextValidTime(jobInfo, new Date());// next-trigger-time in 5s, pre-read again、如下次执行在五秒内 ,则加入时间轮执行if (jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {// 1、make ring secondint ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else { // 未来5秒内要执行的任务、 对未到达now时间的任务// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time// 1、make ring secondint ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、update trigger infofor (XxlJobInfo jobInfo : scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}} else {preReadSuc = false;}// tx stop} catch (Throwable e) {if (!scheduleThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);}} finally {// commit 执行完 提交事务,释放锁。(本质是一个分布式锁的实现)if (conn != null) {try {conn.commit();} catch (Throwable e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.setAutoCommit(connAutoCommit);} catch (Throwable e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.close();} catch (Throwable e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}// close PreparedStatementif (null != preparedStatement) {try {preparedStatement.close();} catch (Throwable e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}long cost = System.currentTimeMillis() - start;// Wait seconds, align secondif (cost < 1000) { // scan-overtime, not waittry {// 成功扫描后休眠1秒继续下一轮 、 如果没有任务则休眠5秒(PRE_READ_MS) 、并校准时间// pre-read period: success > scan each second; fail > skip this period;TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);} catch (Throwable e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");}});scheduleThread.setDaemon(true);scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");scheduleThread.start();// ring threadringThread = new Thread(new Runnable() {@Overridepublic void run() {while (!ringThreadToStop) {// align secondtry {// 每秒执行一次,时间与系统时间同步 时间对齐TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);} catch (Throwable e) {if (!ringThreadToStop) {logger.error(e.getMessage(), e);}}try {// second dataList<Integer> ringItemData = new ArrayList<>();int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;// 获取当前秒数及前一秒的时间轮数据(避免处理时间过长导致错过执行时机for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);if (tmpData != null) {ringItemData.addAll(tmpData);}}// ring triggerlogger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData));if (ringItemData.size() > 0) {// do triggerfor (int jobId : ringItemData) {// do trigger 对获取到的所有任务ID,调用JobTriggerPoolHelper.trigger()触发执行JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);}// clear 执行完的任务从时间轮中移除ringItemData.clear();}} catch (Throwable e) {if (!ringThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");}});ringThread.setDaemon(true);ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");ringThread.start();}private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) {try {Date nextValidTime = generateNextValidTime(jobInfo, fromTime);if (nextValidTime != null) {jobInfo.setTriggerStatus(-1); // pass, may be InaccuratejobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());jobInfo.setTriggerNextTime(nextValidTime.getTime());} else {// generateNextValidTime fail, stop jobjobInfo.setTriggerStatus(0);jobInfo.setTriggerLastTime(0);jobInfo.setTriggerNextTime(0);logger.error(">>>>>>>>>>> xxl-job, refreshNextValidTime fail for job: jobId={}, scheduleType={}, scheduleConf={}",jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf());}} catch (Throwable e) {// generateNextValidTime error, stop jobjobInfo.setTriggerStatus(0);jobInfo.setTriggerLastTime(0);jobInfo.setTriggerNextTime(0);logger.error(">>>>>>>>>>> xxl-job, refreshNextValidTime error for job: jobId={}, scheduleType={}, scheduleConf={}",jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf(), e);}
}private void pushTimeRing(int ringSecond, int jobId) {// push async ringList<Integer> ringItemData = ringData.get(ringSecond);if (ringItemData == null) {ringItemData = new ArrayList<Integer>();ringData.put(ringSecond, ringItemData);}ringItemData.add(jobId);logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData));
}
定时触发任务是如何实现的
xxl_job_info表是记录定时任务的db表,里面有个trigger_next_time(Long)字段,表示下一次触发的时间点任务时间被修改 / 每一次任务触发后,可以根据cronb表达式计算下一次触发时间戳:
Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date()))
更新trigger_next_time字段。
定时执行任务逻辑:
- 定时任务
scheduleThread:不断从db把5秒内要执行的任务读出,立即触发 / 放到时间轮等待触发,并更新trigger_next_time - 获取当前时间
now - 轮询
db,找出trigger_next_time在距now 5秒内的任务
3.1 对到达now时间后的任务(超出now 在 5秒外)
(1) 直接跳过不执行;
(2) 重置trigger_next_time
3.2 对到达now时间后的任务(超出now在5秒内)
(1) 开线程执行触发逻辑;
(2) 若任务下一次触发时间是在5秒内,则放到时间轮内(Map<Integer, List>秒数(1-60) => 任务id列表);
(3) 重置trigger_next_time
3.3 对未到达now时间的任务
(1)直接放到时间轮内;
(2)重置trigger_next_time - 定时任务
ringThread:时间轮实现到点触发任务
4.1 时间轮数据结构:Map<Integer, List<Integer>> key是秒数(1-60),value是任务id列表 - 获取当前时间秒数
- 从时间轮内移出当前秒数前2个秒数(避免处理耗时太长,跨过刻度,向前校验一个刻度)的任务列表id,一一触发任务;
如何避免集群中的多个服务器同时调度任务
当xxl-job应用本身集群部署(实现高可用HA)时,如何避免集群中的多个服务器同时调度任务?
通过mysql悲观锁实现分布式锁(for update语句)
setAutoCommit(false)关闭隐式自动提交事务,启动事务select lock for update(显式排他锁,其他事务无法进入&无法实现for update)- 读
db任务信息 -> 拉任务到内存时间轮 -> 更新db任务信息 commit提交事务,同时会释放for update的排他锁(悲观锁)
XxlJobTrigger
public class XxlJobTrigger {private static Logger logger = LoggerFactory.getLogger(XxlJobTrigger.class);/*** trigger job** @param jobId* @param triggerType* @param failRetryCount >=0: use this param* <0: use param from job info config* @param executorShardingParam* @param executorParam null: use job param* not null: cover job param* @param addressList null: use executor addressList* not null: cover*/public static void trigger(int jobId,TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam,String addressList) {// load dataXxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);if (jobInfo == null) {logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);return;}if (executorParam != null) {jobInfo.setExecutorParam(executorParam);}// 获取重试次数int finalFailRetryCount = failRetryCount >= 0 ? failRetryCount : jobInfo.getExecutorFailRetryCount();XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());// cover addressList ,覆盖调用地址if (addressList != null && addressList.trim().length() > 0) {group.setAddressType(1);group.setAddressList(addressList.trim());}// sharding paramint[] shardingParam = null;if (executorShardingParam != null) {//分片参数String[] shardingArr = executorShardingParam.split("/");if (shardingArr.length == 2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {shardingParam = new int[2];shardingParam[0] = Integer.valueOf(shardingArr[0]);shardingParam[1] = Integer.valueOf(shardingArr[1]);}}// 如果是分片广播模式if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList() != null && !group.getRegistryList().isEmpty()&& shardingParam == null) {// 则将所有的执行器地址每个发一次for (int i = 0; i < group.getRegistryList().size(); i++) {// shardIndex =>i shardTotal =》 group.getRegistryList().size()processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}} else {// 没有分片参数if (shardingParam == null) {shardingParam = new int[]{0, 1};}// 执行processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}}private static boolean isNumeric(String str) {try {int result = Integer.valueOf(str);return true;} catch (NumberFormatException e) {return false;}}/*** @param group job group, registry list may be empty* @param jobInfo* @param finalFailRetryCount* @param triggerType* @param index sharding index* @param total sharding index*/private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total) {// paramExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy// 路由策略ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategyString shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) ? String.valueOf(index).concat("/").concat(String.valueOf(total)) : null;// 1、save log-idXxlJobLog jobLog = new XxlJobLog();jobLog.setJobGroup(jobInfo.getJobGroup());jobLog.setJobId(jobInfo.getId());jobLog.setTriggerTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());// 2、init trigger-paramTriggerParam triggerParam = new TriggerParam();triggerParam.setJobId(jobInfo.getId());triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());triggerParam.setExecutorParams(jobInfo.getExecutorParam());triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());triggerParam.setLogId(jobLog.getId());triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());triggerParam.setGlueType(jobInfo.getGlueType());triggerParam.setGlueSource(jobInfo.getGlueSource());triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());triggerParam.setBroadcastIndex(index);triggerParam.setBroadcastTotal(total);// 3、init addressString address = null;ReturnT<String> routeAddressResult = null;if (group.getRegistryList() != null && !group.getRegistryList().isEmpty()) {if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {if (index < group.getRegistryList().size()) {address = group.getRegistryList().get(index);} else {address = group.getRegistryList().get(0);}} else {// 如果不是分片广播,则通过路由算法进行选择一个地址routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else {routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executorReturnT<String> triggerResult = null;if (address != null) {//执行任务triggerResult = runExecutor(triggerParam, address);} else {triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}// 收集信息和存储// 5、collection trigger infoStringBuffer triggerMsgSb = new StringBuffer();triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":").append((group.getAddressType() == 0) ? I18nUtil.getString("jobgroup_field_addressType_0") : I18nUtil.getString("jobgroup_field_addressType_1"));triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());if (shardingParam != null) {triggerMsgSb.append("(" + shardingParam + ")");}triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>" + I18nUtil.getString("jobconf_trigger_run") + "<<<<<<<<<<< </span><br>").append((routeAddressResult != null && routeAddressResult.getMsg() != null) ? routeAddressResult.getMsg() + "<br><br>" : "").append(triggerResult.getMsg() != null ? triggerResult.getMsg() : "");// 6、save log trigger-infojobLog.setExecutorAddress(address);jobLog.setExecutorHandler(jobInfo.getExecutorHandler());jobLog.setExecutorParam(jobInfo.getExecutorParam());jobLog.setExecutorShardingParam(shardingParam);jobLog.setExecutorFailRetryCount(finalFailRetryCount);//jobLog.setTriggerTime();jobLog.setTriggerCode(triggerResult.getCode());jobLog.setTriggerMsg(triggerMsgSb.toString());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());}/*** run executor** @param triggerParam* @param address* @return*/public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address) {ReturnT<String> runResult = null;try {// 9999 执行器的客户端 9999端口(内嵌服务),调用run ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);//执行器执行run 后 返回结果 runResult = executorBiz.run(triggerParam);} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));}StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");runResultSB.append("<br>address:").append(address);runResultSB.append("<br>code:").append(runResult.getCode());runResultSB.append("<br>msg:").append(runResult.getMsg());runResult.setMsg(runResultSB.toString());return runResult;}}
如何实现任务执行器的路由?
- 第一个、最后一个、轮询、随机:都是简单读
address_list即可 - 一致性HASH:
TreeSet实现一致性hash算法 - 最不经常使用、最近最久未使用:
HashMap、LinkedHashMap - 故障转移:遍历
address_list获取address时,逐个检查该address的心跳(请求返回状态);只有心跳正常的address才返回使用 - 忙碌转移:遍历
address_list获取address时,逐个检查该address是否忙碌(请求返回状态);只有状态为idle的address才返回使用
如何实现任务分片、并行执行?
- 拉出任务的执行机器列表,逐个设置
index / total,把index / total分发到任务执行器 - 任务执行器可根据
index / total参数开发分片任务
