深入理解Zookeeper与Kafka:分布式系统中的协调与消息队列
- 前言
- 一、Zookeeper
- 1.1 Zookeeper概述
- 1.2 Zookeeper工作机制
- 1.3 Zookeeper数据结构
- 1.4 Zookeeper应用场景
- 1.5 Zookeeper选举机制
- 1.6 部署Zookeeper集群
- 二、Kafka
- 2.1 为什么需要消息队列(MQ)
- 2.2 使用消息队列的好处
- 2.3 消息队列的两种模式
- 2.4 Kafka定义
- 2.5 Kafka简介
- 2.6 Kafka的特性
- 2.7 Kafka系统架构
- 2.8 部署Kafka集群
- 2.9 Kafka架构深入
- 2.10 Filebeat+Kafka+ELK部署
- 结语
前言
在当今的分布式系统中,协调服务和消息队列是两个不可或缺的组件。它们在确保系统的高可用性、一致性和可扩展性方面发挥着至关重要的作用。Zookeeper和Kafka作为这两个领域的代表性工具,分别在分布式协调和消息队列中扮演着重要角色。本文将深入探讨Zookeeper和Kafka的各个方面,包括它们的概述、工作机制、数据结构、应用场景、选举机制以及部署方法。
一、Zookeeper
1.1 Zookeeper概述
Zookeeper是一个分布式协调服务,专为分布式应用提供高效可靠的协调、同步、配置管理和故障恢复等功能。它的设计目的是简化分布式系统的管理,保证多个节点之间的数据一致性和协调工作。Zookeeper提供了类似文件系统的层次化命名空间,用来存储和管理元数据,确保分布式应用的高可用性和强一致性。
1.2 Zookeeper工作机制
从设计模式角度来看,Zookeeper是一个基于观察者模式设计的分布式服务管理框架。它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就会负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。简而言之,Zookeeper=文件系统+通知机制。
1.3 Zookeeper数据结构
Zookeeper的数据结构类似于一个层次化的文件系统。ZNode是Zookeeper中存储数据的基本单元,每个ZNode都可以存储少量的数据,并且可以有子节点,形成树状结构。ZNode分为持久节点、临时节点和顺序节点。持久节点会一直存在,直到手动删除;临时节点在客户端会话断开时自动删除;顺序节点在创建时会自动添加递增的编号。
1.4 Zookeeper应用场景
Zookeeper提供的服务包括统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。它在分布式环境下帮助实现对应用/服务的统一命名、配置文件的同步、节点状态的实时监控以及服务器的动态上下线管理。
1.5 Zookeeper选举机制
Zookeeper的选举机制确保集群中的所有节点对外表现为一个统一的服务。选举机制分为两个阶段:Leader选举和投票确认。第一次启动选举机制和非第一次启动选举机制的流程有所不同,前者涉及多个服务器的启动和投票,后者则涉及Leader的故障和重新选举。
1.6 部署Zookeeper集群
部署Zookeeper集群需要准备环境、安装JDK、下载和安装Zookeeper、配置Zookeeper、启动和检查集群状态。具体步骤包括关闭防火墙、安装JDK、下载Zookeeper安装包、修改配置文件、创建数据目录和日志目录、配置启动脚本、设置开机自启以及启动Zookeeper。
systemctl stop firewalld
systemctl disable firewalld
setenforce 0
rpm -ivh jdk-8u231-linux-x64.rpm
#配置环境变量
vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.8.0_231-amd64
export JAVA_BIN=/usr/java/jdk1.8.0_231-amd64/bin
export PATH=${JAVA_HOME}/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
tar zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7
cd /usr/local/zookeeper-3.5.7/conf/
cp zoo_sample.cfg zoo.cfg
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
#
dataDir=/usr/local/zookeeper-3.5.7/data
dataLogDir=/usr/local/zookeeper-3.5.7/logs
server.1=192.168.10.11:3188:3288
server.2=192.168.10.12:3188:3288
server.3=192.168.10.13:3188:3288
#
mkdir /usr/local/zookeeper-3.5.7/data
mkdir /usr/local/zookeeper-3.5.7/logs
#3台ID创建
echo 1 > /usr/local/zookeeper-3.5.7/data/myid
echo 2 > /usr/local/zookeeper-3.5.7/data/myid
echo 3 > /usr/local/zookeeper-3.5.7/data/myid
service zookeeprt start
service zookeeper status
二、Kafka
2.1 为什么需要消息队列(MQ)
在高并发环境下,同步请求可能导致系统压力过大,消息队列通过异步处理请求缓解系统压力。消息队列常应用于异步处理、流量削峰、应用解耦、消息通讯等场景。
2.2 使用消息队列的好处
消息队列提供了系统解耦、可恢复性、缓冲、灵活性和峰值处理能力、异步通信等好处。它允许独立的扩展或修改处理过程,降低进程间的耦合度,控制和优化数据流速度,处理突发流量,并提供异步处理机制。
2.3 消息队列的两种模式
消息队列有两种模式:点对点模式和发布/订阅模式。点对点模式是一对一的,消费者主动拉取数据,消息收到后消息清除;发布/订阅模式是一对多的,消费者消费数据之后不会清除消息。
2.4 Kafka定义
Kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。它最初由LinkedIn公司开发,是一个分布式、支持分区的、多副本的、基于Zookeeper协调的分布式消息中间件系统。
2.5 Kafka简介
Kafka具有高吞吐量、低延迟、可扩展性、持久性、可靠性、容错性、高并发等特性。它每秒可以处理几十万条消息,延迟最低只有几毫秒,支持数千个客户端同时读写。
2.6 Kafka的特性
Kafka的特性包括高吞吐量、低延迟、可扩展性、持久性、可靠性、容错性、高并发等。它通过分区和副本机制实现数据的扩展和可靠性。
2.7 Kafka系统架构
Kafka系统架构包括Broker服务器、Topic主题、Partition分区、Producer生产者、Consumer消费者、Consumer Group消费者组、Offset偏移量、Zookeeper等组件。每个组件在Kafka中扮演着不同的角色,共同实现消息的生产和消费。
2.8 部署Kafka集群
部署Kafka集群需要下载安装包、安装Kafka、修改配置文件、配置环境变量、配置启动脚本、设置开机自启以及启动Kafka。具体步骤包括下载Kafka安装包、解压和移动安装包、修改配置文件、配置环境变量、配置启动脚本、设置开机自启以及启动Kafka。
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka
cd /usr/local/kafka/config/
vim server.properties
broker.id=0 #21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=1、broker.id=2
listeners=PLAINTEXT://192.168.10.18:9092 #指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改
log.dirs=/usr/local/kafka/logs #60行,kafka运行日志存放的路径,也是数据存放的路径
zookeeper.connect=192.168.10.11:2181,192.168.10.12:2181,192.168.10.13:2181 #配置连接Zookeeper集群地址
vim /etc/init.d/kafka
#设置启动文件
#!/bin/bash
#chkconfig:2345 22 88
#description:kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
echo "------KAFKA启动------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;
;
stop)
echo "------KAFKA停止------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;
;
restart)
$0 stop
$0 start
;
;
status)
echo "------KAFKA运行状态------"
count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
if [ "$count" -eq 0];
then
ehco "kafka is not running"
else
echo "kafka is running"
fi
;
;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
#设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka
#分别启动 Kafka
service kafka start
#创建topic
kafka-topics.sh --create --zookeeper 192.168.10.11:2181,192.168.10.12:2181,192.168.10.13:2181 --replication-factor 2 --partitions 3 --topic test
#查看当前服务器中的所有 topic
kafka-topics.sh --list --zookeeper 192.168.10.11:2181,192.168.10.12:2181,192.168.10.13:2181
#查看某个 topic 的详情
kafka-topics.sh --describe --zookeeper 192.168.10.11:2181,192.168.10.12:2181,192.168.10.13:2181
#发布消息
kafka-console-producer.sh --broker-list 192.168.10.11:9092,192.168.10.12:9092,192.168.10.13:9092 --topic test
#消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.10.11:9092,192.168.10.12:9092,192.168.10.13:9092 --topic httpd --from-beginning
#修改分区数
kafka-topics.sh --zookeeper 192.168.10.11:2181,192.168.10.12:2181,192.168.10.13:2181 --alter --topic test --partitions 6
#删除 topic
kafka-topics.sh --delete --zookeeper 192.168.10.11:2181,192.168.10.12:2181,192.168.10.13:2181 --topic test
2.9 Kafka架构深入
Kafka的工作流程及文件存储机制包括消息的分类、分区、日志文件的分片和索引机制。数据可靠性保证通过ACK机制实现,数据一致性问题通过Leader和Follower的同步机制解决。
2.10 Filebeat+Kafka+ELK部署
Filebeat+Kafka+ELK部署包括前提部署Zookeeper+Kafka集群、部署Filebeat、部署ELK、Kibana添加等步骤。通过这些步骤,可以实现日志的收集、传输、存储和可视化。
cd /usr/local/filebeat
vim filebeat.yml
#上层次
filebeat.prospectors:
- type: log
enabled: true
paths:
- /var/log/httpd/access_log
tags: ["access"]
- type: log
enabled: true
paths:
- /var/log/httpd/error_log
tags: ["error"]
......
#添加输出到 Kafka 的配置
output.kafka:
enabled: true
hosts: ["192.168.10.18:9092","192.168.10.21:9092","192.168.10.22:9092"] #指定 Kafka 集群配置
topic: "httpd"
filebeat test config -c /etc/filebeat/filebeat.yml #
systemctl start filebeat
./filebeat -e -c filebeat.yml
--------------------------------------------------------------------------------------------------------------
cd /etc/logstash/conf.d/
vim kafka.conf
input {
kafka {
bootstrap_servers =>
"192.168.10.18:9092,192.168.10.21:9092,192.168.10.22:9092" #kafka集群地址
topics =>
"httpd" #拉取的kafka的指定topic
type =>
"httpd_kafka" #指定 type 字段
codec =>
"json" #解析json格式的日志数据
auto_offset_reset =>
"latest" #拉取最近数据,earliest为从头开始拉取
decorate_events =>
true #传递给elasticsearch的数据额外增加kafka的属性数据
}
}
output {
if "access" in [tags] {
elasticsearch {
hosts =>
["192.168.10.10:9200"]
index =>
"httpd_access-%{+YYYY.MM.dd}"
}
}
if "error" in [tags] {
elasticsearch {
hosts =>
["192.168.10.10:9200"]
index =>
"httpd_error-%{+YYYY.MM.dd}"
}
}
stdout { codec => rubydebug
}
}
#启动 logstash
logstash -f kafka.conf
结语
Zookeeper和Kafka在分布式系统中扮演着重要角色。Zookeeper通过提供分布式协调服务,确保系统的高可用性和一致性;Kafka通过提供高效的消息队列服务,实现系统的异步处理和流量削峰。通过本文的深入探讨,希望读者能够更好地理解和使用这两个工具,为构建高效、可靠的分布式系统打下坚实的基础。