表结构分析
xxl_job_lock:任务调度锁表;
xxl_job_group:执行器信息表,维护任务执行器信息;
xxl_job_info:调度扩展信息表:用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;
xxl_job_log:调度日志表:用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;
xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;
xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;
xxl_job_user:系统用户表;
脚本如下:
查看代码
#
# XXL-JOB
# Copyright (c) 2015-present, xuxueli.CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci;
use `xxl_job`;SET NAMES utf8mb4;CREATE TABLE `xxl_job_info`
(`id` int(11) NOT NULL AUTO_INCREMENT,`job_group` int(11) NOT NULL COMMENT '执行器主键ID',`job_desc` varchar(255) NOT NULL,`add_time` datetime DEFAULT NULL,`update_time` datetime DEFAULT NULL,`author` varchar(64) DEFAULT NULL COMMENT '作者',`alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',`schedule_type` varchar(50) NOT NULL DEFAULT 'NONE' COMMENT '调度类型',`schedule_conf` varchar(128) DEFAULT NULL COMMENT '调度配置,值含义取决于调度类型',`misfire_strategy` varchar(50) NOT NULL DEFAULT 'DO_NOTHING' COMMENT '调度过期策略',`executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',`executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',`executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略',`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',`glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型',`glue_source` mediumtext COMMENT 'GLUE源代码',`glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注',`glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',`child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',`trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行',`trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间',`trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间',PRIMARY KEY (`id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;CREATE TABLE `xxl_job_log`
(`id` bigint(20) NOT NULL AUTO_INCREMENT,`job_group` int(11) NOT NULL COMMENT '执行器主键ID',`job_id` int(11) NOT NULL COMMENT '任务,主键ID',`executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址',`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',`executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',`executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2',`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',`trigger_time` datetime DEFAULT NULL COMMENT '调度-时间',`trigger_code` int(11) NOT NULL COMMENT '调度-结果',`trigger_msg` text COMMENT '调度-日志',`handle_time` datetime DEFAULT NULL COMMENT '执行-时间',`handle_code` int(11) NOT NULL COMMENT '执行-状态',`handle_msg` text COMMENT '执行-日志',`alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '告警状态:0-默认、1-无需告警、2-告警成功、3-告警失败',PRIMARY KEY (`id`),KEY `I_trigger_time` (`trigger_time`),KEY `I_handle_code` (`handle_code`),KEY `I_jobid_jobgroup` (`job_id`,`job_group`),KEY `I_job_id` (`job_id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;CREATE TABLE `xxl_job_log_report`
(`id` int(11) NOT NULL AUTO_INCREMENT,`trigger_day` datetime DEFAULT NULL COMMENT '调度-时间',`running_count` int(11) NOT NULL DEFAULT '0' COMMENT '运行中-日志数量',`suc_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量',`fail_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量',`update_time` datetime DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;CREATE TABLE `xxl_job_logglue`
(`id` int(11) NOT NULL AUTO_INCREMENT,`job_id` int(11) NOT NULL COMMENT '任务,主键ID',`glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE类型',`glue_source` mediumtext COMMENT 'GLUE源代码',`glue_remark` varchar(128) NOT NULL COMMENT 'GLUE备注',`add_time` datetime DEFAULT NULL,`update_time` datetime DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;CREATE TABLE `xxl_job_registry`
(`id` int(11) NOT NULL AUTO_INCREMENT,`registry_group` varchar(50) NOT NULL,`registry_key` varchar(255) NOT NULL,`registry_value` varchar(255) NOT NULL,`update_time` datetime DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `i_g_k_v` (`registry_group`, `registry_key`, `registry_value`) USING BTREE
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;CREATE TABLE `xxl_job_group`
(`id` int(11) NOT NULL AUTO_INCREMENT,`app_name` varchar(64) NOT NULL COMMENT '执行器AppName',`title` varchar(12) NOT NULL COMMENT '执行器名称',`address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '执行器地址类型:0=自动注册、1=手动录入',`address_list` text COMMENT '执行器地址列表,多地址逗号分隔',`update_time` datetime DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;CREATE TABLE `xxl_job_user`
(`id` int(11) NOT NULL AUTO_INCREMENT,`username` varchar(50) NOT NULL COMMENT '账号',`password` varchar(50) NOT NULL COMMENT '密码',`role` tinyint(4) NOT NULL COMMENT '角色:0-普通用户、1-管理员',`permission` varchar(255) DEFAULT NULL COMMENT '权限:执行器ID列表,多个逗号分割',PRIMARY KEY (`id`),UNIQUE KEY `i_username` (`username`) USING BTREE
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;CREATE TABLE `xxl_job_lock`
(`lock_name` varchar(50) NOT NULL COMMENT '锁名称',PRIMARY KEY (`lock_name`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;## —————————————————————— init data ——————————————————INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`)
VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2018-11-03 22:21:31');INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`,`schedule_type`, `schedule_conf`, `misfire_strategy`, `executor_route_strategy`,`executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`,`executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`,`child_jobid`)
VALUES (1, 1, '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'CRON', '0 0 0 * * ? *','DO_NOTHING', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化','2018-11-03 22:21:31', '');INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`)
VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL);INSERT INTO `xxl_job_lock` (`lock_name`)
VALUES ('schedule_lock');commit;
server-worker 调度模型
调度中心和执行器 两个模块之间通讯是 server-worker 模式。调度中心本身就是一个SpringBoot 工程,启动会监听8080端口。
执行器启动后,会启动内置服务( EmbedServer )监听9994端口。这样双方都可以给对方发送命令。
那调度中心如何知道执行器的地址信息呢 ?上图中,执行器会定时发送注册命令 ,这样调度中心就可以获取在线的执行器列表。
关于执行器调用方式源码分享
com.xxl.job.core.thread.ExecutorRegistryThread : 执行器注册线程。
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {//com.xxl.job.core.biz.client.AdminBizClient // adminBiz , 直接通过http方式将本地机器的服务器地址写入到调度中心ReturnT<String> registryResult = adminBiz.registry(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Throwable e) {logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);}}
com.xxl.job.core.biz.client.AdminBizClient
@Override
public ReturnT<String> registry(RegistryParam registryParam) {return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler: 内置服务 任务处理器
这个是处理 admin 服务过来的请求 (调度中心)
try {switch (uri) {case "/beat":return executorBiz.beat();case "/idleBeat":// 判断是当前jobThread是否空闲IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);case "/run":// 核心执行逻辑TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);case "/kill":// 删除jobTheadKillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);case "/log":// 获取执行器这边任务的日志LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);default:return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");}} catch (Throwable e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));}
包含三个属性
比较重要的是执行器的run 方法。
主要是处理各类任务类型, 绑定JobThread 和 Jobhandler 的关系,然后将传递到triggerQueue中。
com.xxl.job.core.thread.JobThread#run
// 获取调度参数triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);// 调度上下文参数XxlJobContext xxlJobContext = new XxlJobContext(triggerParam.getJobId(),triggerParam.getExecutorParams(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal());// 判断是否有超时时间 if (triggerParam.getExecutorTimeout() > 0) {// limit timeoutThread futureThread = null;try {FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);// 具体jobhandle 执行handler.execute();return true;}});futureThread = new Thread(futureTask);futureThread.start();Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle resultXxlJobHelper.handleTimeout("job execute timeout ");} finally {futureThread.interrupt();}} else {// just execute 具体jobhandle 执行handler.execute();}.... // 省略 日志处理finally {// 执行完毕将 if (triggerParam != null) {// callback handler info// 如果任务没有停止if (!toStop) {// 设置 TriggerCallbackThread 回调线程// 放入回调队里中去TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg()));} else {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job running, killed]"));}}}
上述代码分析如下图示:
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0,200,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");}});