RocketMQ入门:基本概念、安装、本地部署与集群部署 - 详解
基本概念
RocketMQ是阿里巴巴开源的一个分布式消息中间件,拥护高吞吐、高可用、可扩展的消息传递,也就是大家常见的消息队列。它最初用于淘宝、天猫等核心业务。
核心角色
Producer(生产者)
- 负责发送消息到 Broker。
- 支持同步、异步、单向(Oneway)三种发送模式。
Consumer(消费者)
- 从 Broker 拉取或接收消息并处理。
- 分为 Push Consumer(Broker 推消息给消费者)和Pull Consumer(消费者主动拉取)。
- 可以以 集群模式(消息只被一个消费者实例消费)或广播模式(消息被所有消费者实例消费)工作。
Broker(消息服务器)
- 存储消息并提供读写服务。
- 一个 Broker 行包含多个Topic,每个 Topic 又划分为多个Queue。
- 通常部署在集群中,支持主从架构(Master/Slave)保证高可用。
NameServer(命名服务)
- 管理 Broker 的路由信息,提供轻量级的服务发现和负载均衡。
- 无状态,可集群部署,节点之间互不通信。
核心概念
Topic(主题)
- 消息的逻辑分类,用于区分不同业务的消息。
- 生产者发送消息时指定 Topic,消费者订阅 Topic 来接收消息。
Queue(队列)
- Topic 物理上的分区,一个 Topic 能够有多个 Queue。
- 消息在 Queue 中是有序的,但跨 Queue 不保证全局有序。
Message(消息)
- 消息的最小传递单元,涵盖:
- Tag:用于在同一个 Topic 内进一步过滤消息。
- Key:消息的唯一标识,便于查询和追踪。
- Body:消息的实际内容。
Offset(偏移量)
- 消费者在某个 Queue 上的消费进度标记。
- 行基于 Offset 回溯消费或跳过消息。
工作流程简述
- 启动 NameServer,等待 Broker 注册。
- Broker 启动,向所有 NameServer 注册自己的 Topic 路由信息。
- Producer连接 NameServer 获取 Broker 地址,然后向 Broker 发送消息。
- Consumer连接 NameServer 获取 Topic 路由信息,从 Broker 拉取或接收消息。
- 消息被消费后,Consumer 提交 Offset,Broker 保留消息直到过期或被清理。
以上只是简要介绍,我的【RocketMQ从入门到精通】通过都已经详细介绍上述角色功能和工作原理,有需要的能够了解一下。
单节点MQ本地部署
安装RocketMQ
最初我们是在Linux环境下部署的RocketMQ,你需要事先准备好你的Linux虚拟机。
然后我们需要安装RocketMQ的源码包,安装的版本为官网最新的5.3.2版本,你可以在官网找到,也可以直接点击这里直接进行安装。安装完成后我建议直接将源码包的安装路径设为用户路径。
进入到你安装RocketMQ的路径下,在Linux控制台依次输入以下的命令,解压源码包并编译构建二进制可执行文件
$ unzip rocketmq-all-5.3.2-source-release.zip
$ cd rocketmq-all-5.3.2-source-release/
$ mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
$ cd distribution/target/rocketmq-5.3.2/rocketmq-5.3.2
启动NameServer
RocketMQ已经安装完成,接下来需要启动NameServer
### 启动namesrv
$ nohup sh bin/mqnamesrv &
### 验证namesrv是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
启动Broker与Proxy
最后启动Broker与Proxy,启动成功之后我们单节点的RocketMQ已经本地部署成功了
### 先启动broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
broker-a就是### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 继而名字
$ tail -f ~/logs/rocketmqlogs/proxy.log
The broker[broker-a,192.169.1.2:10911] boot success...
tail跟踪时出现
说明Broker已经启动成功
集群部署
由于 Broker 只有一个节点,一旦Broker重启或者宕机时,会导致整个服务不可用。因此我们需要集群部署保证我们消息队列的可用性与安全性。
接下来将说明如何搭建一个双主双从异步复制的Broker集群
这里我们使用两台虚拟机来完成集群部署
1 . rocketmqOS1 192.168.59.164 NameServer + Broker Master1 + Slave2
2 . rocketmqOS2 192.168.59.165 NameServer + Broker Master2 + Slave1
ip地址可以根据自身虚拟机的ip地址来决定,但一定要保证两个虚拟机的ip地址是不同的。
1. 修改网络ip地址
先将 rocketmqOS1创建出来,然后在OS1中输入指令
vim /etc/sysconfig/network-scripts/ifcfg-ens33
找到IPADDR一列修改ip地址后重启
2. 修改OS1配置文件
进入到conf目录
其中会有这些文件,我们要搭建的是双主双从异步复制,因此进入2m-2s-async
其中会有broker.a,broker.b的master与slave的配置文件
修改broker-a.properties
RocketMQ集群的名称 brokerClusterName=DefaultCluster就是# 指定整个broker集群的名称,或者说
# 指定master-slave集群的名称。一个RocketMQ集群可以包含多个master-slave集群 brokerName=broker-a
# master的brokerId为0
brokerId=0
# 指定删除消息存储过期文件的时间为凌晨4点
deleteWhen=04
# 指定未发生更新的消息存储文档的保留时长为48小时,48小时后过期,将会被删除
fileReservedTime=48
# 指定当前broker为异步复制
master brokerRole=ASYNC_MASTER
# 指定刷盘策略为异步刷盘
flushDiskType=ASYNC_FLUSH
# 指定Name Server的地址
namesrvAddr=192.168.59.164:9876;192.168.59.165:9876
修改broker-b-s.properties
brokerClusterName=DefaultCluster
# 指定这是另外一个master-slave集群
brokerName=broker-b
# slave的brokerId为非0
brokerId=1 deleteWhen=04 fileReservedTime=48
# 指定当前broker为slave
brokerRole=SLAVE flushDiskType=ASYNC_FLUSH namesrvAddr=192.168.59.164:9876;192.168.59.165:9876
# 指定Broker对外提供服务的端口,即Broker与producer与consumer通信的端口。默认 10911。由于当前主机同时充当着master1与slave2,而前面的master1使用的是默认端口。这 里需将这两个端口加以区分,以区分出master1与slave2 listenPort=11911 # 指定消息存储相关的路径。默认路径为~/store目录。由于当前主机同时充当着master1与 slave2,master1利用的是默认路径,这里就得再指定一个不同路径
storePathRootDir=~/store-s
storePathCommitLog=~/store-s/commitlog storePathConsumeQueue=~/store-s/consumequeue
storePathIndex=~/store-s/index
storeCheckpoint=~/store-s/checkpoint
abortFile=~/store-s/abort
3. 修改OS2配置文件
进入配置文件的步骤与上文一样
修改broker-b.properties
brokerClusterName=DefaultCluster
brokerName=broker-b brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.59.164:9876;192.168.59.165:9876
修改broker-a-s.properties
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=192.168.59.164:9876;192.168.59.165:9876
listenPort=11911
storePathRootDir=~/store-s
storePathCommitLog=~/store-s/commitlog
storePathConsumeQueue=~/store-s/consumequeue
storePathIndex=~/store-s/index
storeCheckpoint=~/store-s/checkpoint
abortFile=~/store-s/abort
4. 启动服务器
启动NameServer
分别启动两个虚拟器RocketMQ的NameServer,步骤与上文相同
启动Master
分别启动rocketmqOS1与rocketmqOS2两个主机中的broker master,指定不同的加载配置文件。
OS1:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties &
OS2:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties &
启动Slave
分别启动rocketmqOS1与rocketmqOS2两个主机中的broker slave,指定不同的加载配置文件。
OS1:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &
OS2:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &
5. mqadmin
在mq解压目录的bin目录下有一个mqadmin命令,该命令是一个运维指令,用于对mq的主题,集群,broker 等信息进行管理。
修改bin/tools.sh配置
在运行mqadmin命令之前,先要修改mq解压目录下bin/tools.sh部署的JDK的ext目录位置。本机的ext 目录在/usr/java/jdk1.8.0_161/jre/lib/ext 。
使用vim命令打开tools.sh档案,并在JAVA_OPT配置的-Djava.ext.dirs这一行的后面添加ext的路径。
JAVA_OPT="${JAVA_OPT} - Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ ext:/usr/java/jdk1.8.0_161/jre/lib/ext"
运行mqadmin
从官网文章可以了解相关的命令