当前位置: 首页 > news >正文

完整教程:zk管理kafkakafka-broker通信

完整教程:zk管理kafka&kafka-broker通信

系列文章目录


一、作用和不足

作用

2.8版本以前,kafka使用zk管理并存储kafka的关键元数据,每个kafka集群都有broker作为控制器,是zk watcher 选举产生的。controller不仅像其他普通节点一样存储主体分区日志和处理消费生成数据请求,而且还维护着kafka集群元数据,比如broker id,主题分区,leader, isr信息。它将这些信息持久化到zk中。

0.9版本以前,controller通过单线程循环的方式,将更新后的元数据同步给其他broker节点,生产者和消费者都需要直接连接到zk集群进行发送和消费数据,现在这种方式已经被通过 bootstrap.servers连接的方式取代

不足

在一个健康的zk集群中,每时每刻只有一个健康的leader节点,zk必须保证半数以上的节点可用,才能保证zk集群可用。浪费资源。

zk和kafka是两个不同的系统,运维工作加倍。

controller节点存在单点故障。

controller宕机的话,新controller 需要从zk拉取所有分区的元数据信息,向每个broker节点发送leaderAndISR和updateMetaData请求,时间复杂度是O(num.partitions)。

broker宕机的话, 受控关机时间长,尤其是某个broker节点上分布的分区leader非常多时,需要重选leader,重新拉取元数据信息,分区越多重启时间越长,时间复杂度是O(partitions.leader)。

二、zk全部宕机后,kafka集群能正常收发消息吗?

能够正常收发消息。但是在kafka的server log里面,会报错:xxxx拒绝连接
原因是,kafka0.9以后,zk的功能被弱化,用来帮助kafka集群选举controller节点等基本操作,zk节点宕机后,kafka集群只是不能再发现新的broker节点加入了,但是不影响既有的节点收发日志。

三、controller功能和如何选举

  1. 分区 Leader 选举
    当某个 Broker 宕机或重新加入时,Controller 负责重新选举该 Broker 上的分区 Leader。
    确保每个分区都有一个 Leader 和多个 Follower(副本)。
  2. 分区分配与重平衡
    在集群扩容、Broker 故障恢复等情况下,Controller 会重新分配分区到不同的 Broker 上,以实现负载均衡。
    控制 Rebalance(再平衡)过程,确保所有副本同步。
  3. 元数据管理
    维护并更新集群的元数据信息,如:
    Topic 的分区数量
    分区的 Leader 和副本信息
    Broker 的状态(在线/离线)
    Controller 自己的状态(主/备)
  4. 监控 Broker 状态
    检测 Broker 是否正常运行。
    如果某个 Broker 失联,Controller 会触发相应的处理逻辑(如重新分配分区)。
  5. 控制副本同步
    确保副本之间的数据同步,防止数据丢失

集群的启动中,如何选出controller: 抢占机制

集群运行中,一些broker节点压力过大,导致controller反应慢,由于没有超时,zk无法判断是否下线,但是导致某些分区选主超时,可以删除controller临时节点,zk触发controller重新选主。健康的broker节点抢先注册成为controller, epoch任期号是2.
path: /controller path:/controller_epoch

value:{broker: 1} value:2

四、kafka发送数据到client

向broker节点传输数据是通过networkclient完成的。kafka客户端的networkclient类使用java nio 实现与broker节点通信,源代码可以看Selector类,

producer获取元数据信息,判断需要发送的topic分区位于哪个broker节点,selector和具体的broker节点创建一个socket 连接。
this.nioSelector = java.nio.channels.Selector.open();

具体是如何连接的呢?看源码

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
//1.检查是否和Node重复建立连接,确保每个channel当下只能有一个Node连接
ensureNotRegistered(id);
//2.创建SocketChannel对象
SocketChannel socketChannel = SocketChannel.open();
SelectionKey key = null;
try {
/**
* 3.配置SocketChannel对象
* 将SocketChannel设置为非阻塞模式,keepAlive设置为true,TCP_NODELAY设置为true
* 指定发送和接收缓冲区的大小(如果不是默认值)
* */
configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
/**
* 4.确认连接是否真正建立
* 因为是非阻塞模式,scoketChannel.connect()只是尝试发起一个连接请求,并不保证连接一定建立成功
* connect()返回true表示连接已经建立成功,返回false表示连接请求已经发出但还没有建立成功
* 因此boolean connected会立即返回连接状态,但是并一定是true
* */
boolean connected = doConnect(socketChannel, address);
/**
* 5.注册SocketChannel对象到Selector
* 注册感兴趣的事件为OP_CONNECT,表示当连接建立成功时,Selector会收到通知
* */
key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node {}", id);
//6.1 将key加入immediatelyConnectedKeys集合,表示该连接已经建立成功
immediatelyConnectedKeys.add(key);
//6.2 将key的感兴趣事件清空,因为连接已经建立成功,不需要再监听OP_CONNECT事件
key.interestOps(0);
}
} catch (IOException | RuntimeException e) {
if (key != null)
immediatelyConnectedKeys.remove(key);
channels.remove(id);
socketChannel.close();
throw e;
}
}

Selector类中还有一个poll()方法,获取准备就绪的SelectionKey,代表的是当前SocketChannel上已经就绪的时间,然后就可以 遍历这些SelectionKey来执行在interestSets中的兴趣操作
在KafkaProducer和KafkaConsumer中,负责与Broker通信的线程会重复调用NetworkClient的poll(), 在poll()中,会执行selector的select()方法,并以IO多路复用的方式处理与Broker之间的通信

if (numReadyKeys >
0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
//获取所有准备就绪的SelectionKey
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

sender子线程将封装好的发送请求缓存在InFlightRequests中,然后由NIO请求将消息发送到broker
在这里插入图片描述

InFlightRequests中的一个Deque中可以缓存多个sender创建好的ProduceRequest。“飞行中的请求”指的是请求发出去,尚未收到broker端的响应
其中Deque大小由参数max.in.flight.requests.per.connection决定,默认是5.如果将幂等性参数设置为true,broker会按照请求的先后顺序处理produceRequest;反之如果设置为false,只能将max.in.flight.requests.per.connection设置为1,才能保证顺序发送消息

http://www.hskmm.com/?act=detail&tid=18939

相关文章:

  • 域泛化DomainBed的评价指标含义解释
  • JUC: 线程锁
  • 上证指数历年每月涨跌统计 - Leone
  • InteractiveCommunication Problems
  • JSON 框架混用避坑指南:FastJSON vs Jackson
  • 企业级大数据技术栈:基于Hadoop+Spark的全球经济指标分析与可视化环境实践
  • 若邻接矩阵是三角矩阵,则存在拓扑序列;反之则不一定成立
  • 20250927Sat VIM 在函数内部任一行,按 [[ 即跳转到函数的开头
  • macOS 多 Java 版本管理(jenv 方案)
  • 软件技术基础第一次课程
  • 石子合并(一排的和一个环的)
  • 思维题练习
  • NXP - 用MCUXpresso IDE导入lpcopen_2_10_lpcxpresso_nxp_lpcxpresso_1769.zip中的工程 - 教程
  • spatial项目的主要领导者斯坦福大学ppl实验室的 Kunle Olukotun 教授和 Christos Kozyrakis 教授
  • 程序语言杂谈:概述
  • 字符串基础
  • 在CodeBolcks下wxSmith的C++编程教程——使用 wxGrid
  • 题解:P12479 [集训队互测 2024] 长野原龙势流星群
  • linux下nginx
  • 9.27
  • OI 笑传 #12
  • spatial芯片设计语言 学习笔记
  • 【C++】23. C++11(上) - 教程
  • kali2025搭建ARL灯塔系统
  • 实用指南:AI 术语通俗词典:LLM(大语言模型)
  • java学习 2025-9-27
  • 题解:P11667 [USACO25JAN] Astral Superposition B
  • 北极通讯网络题解(做题记录)
  • elasticsearch安装插件 - 实践
  • 个人学习——前端react项目框架