当前位置: 首页 > news >正文

XXL-JOB-源码分享(1)

 自研调度组件并支持集群部署,可保证调度中心HA;

 

com.xxl.job.core.executor.XxlJobExecutor#initEmbedServer 初始化内部调度服务

查看代码
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {// fill ip portport = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();// generate addressif (address==null || address.trim().length()==0) {String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is nulladdress = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}// accessTokenif (accessToken==null || accessToken.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");}// startembedServer = new EmbedServer();embedServer.start(address, port, appname, accessToken);}

开启服务(netty 实现)

定义业务线程池

public void start(final String address, final int port, final String appname, final String accessToken) {executorBiz = new ExecutorBizImpl();thread = new Thread(new Runnable() {@Overridepublic void run() {// paramEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();// 自定义业务线程池ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0,200,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");}});try {// start serverServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle.addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bindChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);// start registrystartRegistry(appname, address);// wait util stopfuture.channel().closeFuture().sync();} catch (InterruptedException e) {logger.info(">>>>>>>>>>> xxl-job remoting server stop.");} catch (Throwable e) {logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);} finally {// stoptry {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Throwable e) {logger.error(e.getMessage(), e);}}}});thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leavethread.start();}

EmbedHttpServerHandler  内部处理器 、主要处理来自admi 服务以http方式对执行器服务的调用。主要包括心跳、执行、停止、日志等接口。

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {// validif (HttpMethod.POST != httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri == null || uri.trim().length() == 0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}if (accessToken != null&& accessToken.trim().length() > 0&& !accessToken.equals(accessTokenReq)) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingtry {switch (uri) {case "/beat":return executorBiz.beat();case "/idleBeat":IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);case "/run":TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);case "/kill":KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);case "/log":LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);default:return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");}} catch (Throwable e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));}}

该执行器是实现为: com.xxl.job.core.biz.impl.ExecutorBizImpl

自动注册执行器地址(执行器会周期性自动注册任务,调度中心将会自动发现注册的任务并触发执行):

public void startRegistry(final String appname, final String address) {// start registryExecutorRegistryThread.getInstance().start(appname, address);}

image

http://www.hskmm.com/?act=detail&tid=10613

相关文章:

  • ctfshow web入门 SSRF
  • C#中避免GC压力和提高性能的8种技术
  • ctfshow web入门 爆破
  • 函数内联
  • 7. Innodb底层原理与Mysql日志机制深入剖析
  • 深入解析:HSA35NV001美光固态闪存NQ482NQ470
  • ERP和MES、WMS、CRM,到底怎么配合 - 智慧园区
  • YOLO实战应用 1YOLOv5 架构与模块
  • YOLO实战应用 2数据准备与增强
  • Day18稀疏数组
  • 底层
  • YOLO实战应用 3训练与优化策略
  • WPF 视图缩略图控件(支持缩放调节与拖拽定位)
  • ik中文分词器使用
  • 动态水印也能去除?ProPainter一键视频抠图整合包下载
  • SpringBoot整合RustFS:全方位优化文件上传性能
  • windows使用es-client插件
  • AI学习日记 - 实践
  • es中的端点
  • 解码C语言宏
  • es中的索引
  • es中的数据类型
  • 防御安全播客第214期:数据泄露与漏洞攻防实战
  • windows使用kibana
  • 03作业
  • 软工作业个人项目
  • YOLO进阶提升 5标注与配置
  • rapidxml中接口函数
  • YOLO进阶提升 6模型训练与测试
  • YOLO进阶提升 4训练准备与数据处理