Skip to content

RocketMQ 基础

RocketMQ系统架构简介

RocketMQ 由 NameServer、Broker、Producer、Consumer 四大组件组成

NameServer(注册中心)

一个轻量级的注册中心集群,无状态、高可用。它管理着所有 Broker 的元数据信息(如地址、Topic 分布等),为 Producer 和 Consumer 提供服务发现功能。各组件通过心跳机制与 NameServer 保持通信。

Broker(消息服务器)

RocketMQ 的核心消息存储与转发节点,负责接收 Producer 发来的消息、存储消息、以及为 Consumer 提供消息拉取服务。通常以主从(Master-Slave)模式部署,支持高可用和故障切换。

Producer(生产者)

负责创建并发送消息到 Broker。生产者在启动时会向 NameServer 注册,并定期从 NameServer 获取 Broker 的路由信息,从而直接与对应的 Broker 建立连接进行消息投递。

Consumer(消费者)

用于从 Broker 订阅并消费消息。消费者同样会向 NameServer 注册,并拉取 Broker 的路由信息,然后主动从指定的 Broker 拉取消息进行处理。

总结

RocketMQ整个系统架构中,NameServer 作为中枢协调者,解耦了 Producer/Consumer 与 Broker 的直接依赖;Broker 集群承担实际的消息读写,并通过主从复制保障数据可靠性。这种设计使得 RocketMQ 具备高吞吐、低延迟和强扩展性。

RocketMQ 安装与配置

RocketMQ 安装

环境要求

运行 RocketMQ 需要满足以下基本条件:安装 JDK 1.8 或更高版本(推荐使用 JDK 17),因为 RocketMQ 是一个基于 Java 构建的分布式消息中间件;同时需运行在 64位操作系统 上,并确保系统拥有 至少 1GB 可用内存。其中,JDK 版本的选择至关重要,建议优先选用稳定且支持长期维护的 JDK 17,以保证性能与兼容性。其余如操作系统架构和内存配置也均为必要前提,缺一不可

Windows环境

1.下载 RocketMQ 安装包

访问 RocketMQ 官网 下载二进制安装包,找到 Binary下载 列选择对应版本进行下载, 推荐 5.3.0 (或更高版本) 并下载。将下载的 ZIP 文件解压到一个没有中文和空格的目录,例如 D:\SoftWare\RocketMQ\rocketmq-all-5.3.0-bin-release。我们将这个目录称为 %ROCKETMQ_HOME%

2.修改 RocketMQ 配置

说明

默认配置在 Linux 下通常设定为 8G,这在 Windows 开发机上往往会导致启动失败(内存不足)。

  • 修改 JVM 内存配置(重要)

    • 进入 %ROCKETMQ_HOME%\bin 目录。
      • 编辑 runserver.cmd (NameServer 启动脚本),找到 set JAVA_OPT=%JAVA_OPT% -server -Xms8g -Xmx8g ...,修改为较小的值,例如:set JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx256m
      • 编辑 runbroker.cmd (Broker 启动脚本),找到 set JAVA_OPT=%JAVA_OPT% -server -Xms8g -Xmx8g ...,修改为:set JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx256m
  • 修改 Broker 配置文件 (可选)

    • 打开 %ROCKETMQ_HOME%\conf\broker.conf。如果需要外网访问或指定 IP,可添加或修改:brokerIP1 = 你的本机IP。默认情况下,Broker 会尝试自动获取本地 IP,通常无需修改即可在本地测试使用。
3.配置环境变量
  • 为了在任何目录下都能运行命令,建议配置系统环境变量
  • 新建系统变量 ROCKETMQ_HOME,值为你的解压路径 (如 D:\SoftWare\RocketMQ\rocketmq-all-5.3.0-bin-release)。
  • %ROCKETMQ_HOME%\bin 添加到系统的 Path 变量中。
  1. 右键「此电脑」→「属性」 环境变量配置1
  2. 「高级系统设置」 环境变量配置2
  3. 「环境变量」 环境变量配置3
  4. 系统变量区域,点击「新建」:
  • 变量名:ROCKETMQ_HOME
  • 变量值:RocketMQ 安装路径(如 D:\SoftWare\RocketMQ\rocketmq-all-5.3.0-bin-release)。 环境变量配置4
  1. 编辑系统变量中的 Path
  • 点击「新建」,添加 %ROCKETMQ_HOME%\bin
  • 点击「确定」保存所有设置。 环境变量配置5
4.启动 RocketMQ 服务
  • 启动 NameServer

    • 打开命令行 (CMD),进入 %ROCKETMQ_HOME%\bin 目录。
    • 执行命令:
      cmd
      start mqnamesrv.cmd

    注意

    不要直接双击运行,建议使用 start 命令或在命令行运行,以便查看日志窗口。启动成功后会看到一个名为 "Apache RocketMQ Name Server" 的窗口。

  • 启动 Broker

    • 打开一个新的命令行窗口,进入 %ROCKETMQ_HOME%\bin 目录。
    • 执行命令(指定 NameServer 地址):
      cmd
      start mqbroker.cmd -n localhost:9876

    注意

    如果是集群或特定 IP 环境,localhost 可能需要替换为具体 IP。启动成功后会看到 "Apache RocketMQ Broker" 窗口。

5.验证安装
  • 观察两个命令行窗口,如果没有报错且持续运行,说明启动成功。
  • 可以查看 %ROCKETMQ_HOME%\logs\rocketmqlogs\ 目录下的日志文件确认详细信息。
6.停止服务
  • 直接关闭上述打开的两个命令行窗口即可停止服务。
  • 或者在命令行执行:
    cmd
    shutdown.cmd -n localhost:9876
    shutdown.cmd -b localhost:9876

Linux环境

1.下载

方式一: 官网下载并上传(推荐)

  1. 官网下载页 选择最新版本的二进制包下载
  1. 通过 SFTP 上传到服务器 上传

方式二: 使用 wget 命令直接下载(不推荐,非国内网站,下载过慢)

bash
# 新建并进入指定目录 (如果目录已存在则不会删除并新建, 会保留)
mkdir -p /opt/software
cd /opt/software/
wget https://archive.apache.org/dist/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip

wget下载

2.解压
解压 rocketmq-all-5.3.0-bin-release.zip
shell
cd /opt/software/

# 解压 rocketmq-all-5.3.0-bin-release.zip 压缩包
unzip rocketmq-all-5.3.0-bin-release.zip

解压解压结果

3.修改默认配置

说明

RocketMQ 相关配置文件中初始堆大小和默认最大堆大小默认为4个g,一般在个人电脑 / 虚拟机上压力还是比较大,所以建议修改一下

生产环境不建议调整。这⼀系列参数实际上就是RocketMQ的JVM调优结果。

  1. runserver.sh

原始runserver.sh文件

修改 runserver.sh 配置
bash
cd /opt/software/rocketmq-all-5.3.0-bin-release

vim bin/runserver.sh

# 根据 JDK(JRE) 版本修改 runserver.sh 文件配置

# 如果 JDK(JVM) 版本 小于等于 9,需修改 89 行 -Xms4g(JVM 初始堆大小) -Xmx4g(JVM 最大堆大小) -Xmn2g(JVM 年轻代大小)配置项,建议修改为 -Xms512m -Xmx512m-Xmn256m,也可根据实际情况调整
-Xms512m -Xmx512m -Xmn256m

# 如果 JDK(JVM) 版本 大于 9,需修改 94 行 -Xms4g(JVM 初始堆大小) -Xmx4g(JVM 最大堆大小)配置项,建议修改为 -Xms512m -Xmx512m,也可根据实际情况调整
-Xms512m -Xmx512m -Xmn256m
  1. runbroker.sh

同理,调整 runbroker.sh 中的内存大小 原始runbroker.sh文件

修改 runbroker.sh 配置
bash
# 第 103 行
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
# 修改为:
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
  1. tools.sh(可选)

如果需要使用 tools.sh 进行安装测试,则需修改 tools.sh 文件内配置的内存大小 原始tools.sh文件

修改 tools.sh 配置(可选)
bash
# 第 57 行
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
# 修改为:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
4.配置环境变量

为了方便在任何目录下执行 RocketMQ 命令,建议配置 ROCKETMQ_HOME 环境变量。

  1. 编辑配置文件
bash
vim /etc/profile
# 或者仅针对当前用户: vim ~/.bashrc
  1. 添加以下内容到文件末尾
bash
# NAMESRV_ADDR 环境变量(启动 Broker时,需要给 Broker 指定 对应的 NAMESRV 的地址)
export NAMESRV_ADDR=localhost:9876

# ROCKETMQ_HOME 环境变量 (可选但推荐)
export ROCKETMQ_HOME=/opt/software/rocketmq-all-5.3.0-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin
  1. 使配置生效
bash
source /etc/profile
# 如果修改的是 .bashrc,则执行: source ~/.bashrc
  1. 验证环境变量
bash
echo $ROCKETMQ_HOME
# 应输出: /opt/software/rocketmq-all-5.3.0-bin-release

echo $NAMESRV_ADDR
# 应输出:localhost:9876
5.启动 RocketMQ 服务

RocketMQ 由两个核心组件组成:NameServer (注册中心) 和 Broker (消息代理)。必须先启动 NameServer,再启动 Broker。

  • 启动 NameServer 并验证是否启动成功

    启动 NameServer 服务
    shell
    # 进入 bin 目录
    cd $ROCKETMQ_HOME/bin
    
    # 启动 NameServer (后台运行)
    # nohup 表示忽略挂起信号,& 表示后台运行
    nohup sh mqnamesrv &
    验证 NameServer 是否启动成功
    shell
    # 查看进程
    jps -l | grep NamesrvStartup
    # 或者查看日志 (日志默认在 ~/logs/rocketmqlogs/namesrv.log)
    # 看到 `The Name Server boot success...` 字样即表示成功。
    tail -f ~/logs/rocketmqlogs/namesrv.log
  • 启动 Broker 并验证是否启动成功

    启动 Broker 服务
    shell
    # 进入 bin 目录
    cd $ROCKETMQ_HOME/bin
    
    # -n 参数指定 NameServer 的地址 (IP:端口),默认端口为 9876
    # 如果是单机测试,IP 可以是 localhost 或 127.0.0.1
    # 如果需要外网访问,建议指定服务器的真实 IP (例如: 192.168.x.x)
    nohup sh mqbroker -n localhost:9876 &
    验证 Broker 是否启动成功
    shell
    # 查看进程
    jps -l | grep BrokerStartup
    # 查看日志
    # 看到 `The broker[...] boot success...` 字样即表示成功。
    tail -f ~/logs/rocketmqlogs/broker.log
6.安装验证

RocketMQ 提供了简单的命令行工具来测试收发消息。

发送测试消息

发送测试消息
shell
# 进入 bin 目录
cd  $ROCKETMQ_HOME/bin

# 输出 `SendResult [sendStatus=SEND_OK, ...]` 表示发送成功。
sh tools.sh org.apache.rocketmq.example.quickstart.Producer

接收(消费)测试消息

接收(消费)测试消息
shell
# 进入 bin 目录
cd  $ROCKETMQ_HOME/bin

# 输出 `ConsumeMessageThread_x Receive New Messages ...` 表示接收成功。
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
7.停止服务

停止服务的顺序通常没有严格限制,但建议先停 Broker 再停 NameServer。

bash
cd  $ ROCKETMQ_HOME/bin

# 停止 Broker
sh mqshutdown broker

# 停止 NameServer
sh mqshutdown namesrv

其它

日志位置

  • 所有日志默认存储在用户主目录下的 ~/logs/rocketmqlogs/ 文件夹中。
  • namesrv.log: NameServer 日志
  • broker.log: Broker 日志

常见问题

Windows 本地 Java 程序无法连接到虚拟机 Linux 中的 RocketMQ NameServer

原因分析

  1. 虚拟机 Linux 系统的防火墙未开放 RocketMQ 核心端口,导致网络请求被拦截;
  2. RocketMQ 配置中 Broker/NameServer 绑定的是虚拟机内网 IP,Windows 无法访问;
  3. 代码中 NameServer 地址配置错误(如写死 localhost 而非虚拟机 IP)。

解决方案一:开放 Linux 防火墙端口(推荐)

RocketMQ 核心端口列表

NameServer:9876(TCP)- 客户端发现服务入口

Broker:10911(TCP)- 客户端收发消息主端口

Broker:10909(TCP)- VIP 通道(代码未禁用时需开放)

Broker:10912(TCP)- HA 主从同步(集群模式需开放)

bash
# 开放核心端口
firewall-cmd --permanent --add-port=9876/tcp
firewall-cmd --permanent --add-port=10911/tcp
firewall-cmd --permanent --add-port=10909/tcp
firewall-cmd --permanent --add-port=10912/tcp

# 重新加载防火墙配置使修改生效
firewall-cmd --reload

# 验证端口是否开放成功
firewall-cmd --list-ports

解决方案二:临时关闭防火墙(测试环境可选)

bash
# CentOS/RHEL 系统
systemctl stop firewalld
systemctl disable firewalld

# Ubuntu/Debian 系统
ufw disable
bash
# Ubuntu/Debian 系统
ufw disable

Java客户端项目搭建

之前的步骤实际上是在服务器上快速验证 RocketMQ 的服务状态,接下来我们动手搭建一个 RocketMQ 的客户端应用,在实际应用中集成并使用 RocketMQ。

方式一:Java客户端项目直接下载

方式二:具体搭建流程

POM 核心依赖

创建一个标准的 maven 项目,在 pom.xml 中引入一下核心依赖
xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>

生产者

创建一个简单的生产者
java
public class Producer {

    /**
     * The number of produced messages.
     */
    public static final int MESSAGE_COUNT = 2;
    public static final String PRODUCER_GROUP = "please_rename_unique_group_name";
    public static final String DEFAULT_NAMESRVADDR = "192.168.218.134:9876";
    public static final String TOPIC = "TopicTest";
    public static final String TAG = "TagA";

    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);

        /*
         * Specify name server addresses.
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         *  producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */
        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
        // producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

        /*
         * Launch the instance.
         */
        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
//        producer.setNamesrvAddr("worker1:9876;worker2:9876;worker3:9876");
        producer.start();

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message(TOPIC /* Topic */,
                    TAG /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                msg.setKeys("aaaa");

                /*
                 * Call send message to deliver message to one of brokers.
                 */
                producer.sendOneway(msg);
                SendResult sendResult = producer.send(msg, 20 * 1000);
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%s%n", sendResult);
                    }

                    @Override
                    public void onException(Throwable e) {
                        System.out.println(e.getMessage());
                    }
                });

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        /*
         * Shut down once the producer instance is no longer in use.
         */
        producer.shutdown();
    }
}

说明

运行其中的 main 方法,就会往 RocketMQ 中发送两条消息。在这个实现过程中,需要注意一下的是对于生产者,需要指定对应的 nameserver 服务的地址,这个地址需要指向自己的服务器。

消费者

创建一个消息消费者接收 RocketMQ 中的消息
java
/**
 * This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}.
 */
public class Consumer {

//    public static final String CONSUMER_GROUP = "newGroup";
    public static final String CONSUMER_GROUP = "please_rename_unique_group_name";
    public static final String DEFAULT_NAMESRVADDR = "192.168.218.134:9876";
    public static final String TOPIC = "TopicTest";

    public static void main(String[] args) throws MQClientException {

        /*
         * Instantiate with specified consumer group name.
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */
        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
         consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

        /*
         * Specify where to start in case the specific consumer group is a brand-new one.
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        /*
         * Subscribe one more topic to consume.
         */
        consumer.subscribe(TOPIC, "*");

        /*
         *  Register callback to execute on arrival of messages fetched from brokers.
         */
        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

说明

运行其中的 main 方法后,就可以启动一个 RocketMQ 消费者,接收之前发到 RocketMQ 上的消息,并将消息内容打印出来。在这个实现过程中,需要重点关注的有两点。一是对于消费者,同样需要指定 nameserver 的地址。二是消费者需要在 RocketMQ 中订阅具体的 Topic,只有发送到这个 Topic 上的消息才会被这个消费者接收到。

RocketMQ可视化管理服务搭建

在之前的简单实验中,RocketMQ 都是以后台服务的方式在运行,我们并不很清楚 RocketMQ 是如何运行的。RocketMQ 的社区就提供了⼀个图形化的管理控制台 Dashboard,可以用可视化的方式直接观测并管理 RocketMQ 的运行过程。

1. Dashboard Jar 包下载

方式一:官网下载并打包

  1. 官网下载 Dashboard 项目(SpringBoot),Zip 压缩包

Dashboard 服务并不在 RocketMQ 的运行包中,需要到 RocketMQ 的官网下载页面 单独下载。

RocketMQ Dashboard

  1. 打包项目为可运行的 Jar 包
bash
# mvn clean package -DskipTests 核心是「清理 + 打包 + 跳过测试」
# clean 保证环境干净,package 完成编译打包,-DskipTests 跳过测试提升构建速度
# 区分 skipTests(只跳过测试执行)和 maven.test.skip=true(跳过测试编译 + 执行),按需使用
# -DskipTests 只会跳过测试执行,不会跳过测试代码的编译;如果想连测试代码编译都跳过,需要用 -Dmaven.test.skip=true
mvn clean package -DskipTests

打包结果

方式二:直接下载已打包 Jar 包

2. 核心配置 连接 NameServer 集群

在与 rocketmq-dashboard-2.0.0.jar 文件同级的目录下新建 application.properties 配置文件,配置 nameserver(注册中心) 的地址

properties
rocketmq.config.namesrvAddrs=127.0.0.1:9876

3. 启动 dashboard

bash
# java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar
# 使用 Java 运行一个可执行的 JAR 包,这里运行的是 RocketMQ Dashboard 应用。

# 1>dashboard.log
# 将标准输出(文件描述符 1)重定向到 dashboard.log 文件。

# 1 代表标准输出(stdout),通常指程序正常运行时打印的信息。

# > 是重定向符号,将输出写入指定文件(覆盖原内容)。
# 等价写法:>dashboard.log 默认也是重定向标准输出。

# 2>&1
# 将标准错误(文件描述符 2)重定向到标准输出(文件描述符 1)当前指向的位置。

# 2 代表标准错误(stderr),通常指程序运行时的错误信息。

# >& 用于将一个输出流合并到另一个输出流。
# 由于前面已经把标准输出指向了 dashboard.log,这里的效果就是将标准错误也写入同一个 dashboard.log 文件,实现所有输出(正常日志 + 错误日志)的统一记录。

# &
# 将整个命令放到后台运行。终端不会阻塞,可以继续执行其他命令;但关闭终端时进程可能退出(取决于 shell 的 hup 设置),如需长期运行建议配合 nohup 使用。
java -jar rocketmq-dashboard-2.0.0.jar 1>dashboard.log 2>&1 &

Dashboard 客户端端口默认为 8080,所以通常 RocketMQ Dashboard 页面访问路径为 ip:8080

RocketMQ Dashboard 页面

4. 常见问题

Maven 打包 rocketmq-dashboard 失败,frontend-maven-plugin 下载 yarn 超时或 SSL 握手失败

原因分析
  1. frontend-maven-plugin 在构建前端资源时,需要从 GitHub 下载 yarn 和 node 的二进制文件(默认地址 https://github.com/...)。

  2. 由于网络不稳定或防火墙干扰,下载过程容易超时或出现 SSL 握手错误(如 Remote host terminated the handshake: SSL peer shut down incorrectly)。

解决方案

修改 pom.xml,配置国内镜像源(推荐)

frontend-maven-plugin 指定国内镜像源(如华为云),可大幅提高下载成功率,避免网络问题。

xml
<plugin>
    <groupId>com.github.eirslett</groupId>
    <artifactId>frontend-maven-plugin</artifactId>
    <version>1.11.3</version>
    <configuration>
      <workingDirectory>frontend</workingDirectory>
      <installDirectory>target</installDirectory>

      <!-- 新增部分 -->
      <!-- 指定 Node.js 和 Yarn 的下载源为华为云镜像 -->
      <nodeDownloadRoot>https://repo.huaweicloud.com/nodejs/</nodeDownloadRoot>
      <yarnDownloadRoot>https://repo.huaweicloud.com/yarn/</yarnDownloadRoot>
      <!-- 可选:将安装目录移到 target 下,避免权限问题 -->
      <installDirectory>${project.build.directory}/frontend</installDirectory>
        
    </configuration>
    <!-- ... 其他 executions 保持不变 ... -->
</plugin>

Rocketmq-dashboard 启动成功,但 Windows 无法访问

原因分析

虚拟机 Linux 系统的防火墙未开放 RocketMQ_Dashboard 核心端口,导致网络请求被拦截。

解决方案
方案1:开放 Linux 防火墙端口(推荐)
开放 Linux 防火墙端口
bash
# 开放核心端口 8080(TCP)- 客户端入口
firewall-cmd --permanent --add-port=8080/tcp

# 重新加载防火墙配置使修改生效
firewall-cmd --reload

# 验证端口是否开放成功
firewall-cmd --list-ports
方案 2:临时关闭防火墙(测试环境可选)
bash
# CentOS/RHEL 系统
systemctl stop firewalld
systemctl disable firewalld

# Ubuntu/Debian 系统
ufw disable

RocketMQ 分布式集群搭建

单节点 RocketMQ 服务无法直接用于生产环境。一旦服务器出现异常,例如磁盘损坏,存储在本地磁盘上的所有数据都会丢失,RocketMQ 中的消息也会随之丢失,会造成严重的业务影响。因此,我们需要搭建RocketMQ 分布式集群,从架构层面避免单点故障带来的数据风险。

RocketMQ 分布式集群采用主从架构实现高可用。在多台服务器组成的集群中,一部分节点作为 Master 节点,负责处理客户端的读写请求;另一部分节点作为 Slave 节点,用于同步备份 Master 节点的数据。这样一来,即使 Master 节点因磁盘损坏等原因发生故障,Slave 节点仍保存着完整的备份数据,确保消息数据不会丢失

RocketMQ分布式集群架构图

这种主从架构的集群下,客户端发送的消息会分散保存到 broker-1-master 和 broker-2-master 两个服务上,然后每个 master 服务都配有一个 slave 服务,可以备份对应 master 服务上的消息,这样就可以防止单点故障造成的消息丢失问题。

1. 服务器预备

准备七台相同的 Linux 服务器,参照 RocketMQ安装(Linux环境),在 七个虚拟机服务器上都安装上 RocketMQ 服务,无需启动。

为了便于观察,本次搭建一个2主2从(1个 Master 对应一个 Slave)的 RocketMQ 集群,并将主节点和从节点都分别部署在不同的服务器上。

机器名角色详细描述
node1nameserver 实例1RocketMQ 集群的路由注册与发现中心,Broker 启动时向其注册,客户端通过 NameServer 获取 Broker 地址。
node2nameserver 实例2与 node1、node3 组成高可用 NameServer 集群,任一节点故障不影响集群整体路由服务。
node3nameserver 实例3进一步保证 NameServer 集群的可靠性,所有 NameServer 节点数据最终一致,客户端可随机连接。
node4broker_a_master_1broker_a 组 主节点 1,负责消息的写入和读取,与 node6 (broker_a_slave_1) 组成主从结构,通过同步或异步方式复制数据。
node5broker_b_master_1broker_b 组主节点 1,负责消息的写入和读取,与 node7 (broker_b_slave_1) 组成主从结构,通过同步或异步方式复制数据。
node6broker_a_slave_1broker_a 组从节点 1,从 node4 同步数据,默认支持消息读取(拉取模式),当 broker_a_master_1 故障时可自动切换为只读服务。
node7broker_b_slave_1broker_b 组 从节点 1,从 node5 同步数据,默认支持消息读取(拉取模式),当 broker_b_master_1 故障时可自动切换为只读服务。

2. 虚拟机域名配置

为每个虚拟机配置域名,每个虚拟机的 hosts 文件都进行如下修改

shell
# 编辑 /etc 目录下的 hosts 文件,添加域名
vim /etc/hosts
shell
#  hosts 文件末尾添加以下内容 IP 根据七台虚拟机的实际 IP 地址进行修改
192.168.218.136 node1
192.168.218.137 node2
192.168.218.138 node3
192.168.218.139 node4
192.168.218.140 node5
192.168.218.141 node6
192.168.218.142 node7

3. Broker 配置

2m-2s-sync (两主两从同步)集群配置(以 broker-a 为例,broker-b 同理)

在 node4 服务器上实现 broker_a_master_1 服务,新建或修改 broker_a_master_1.properties 配置文件。配置文件建议路径 $ROCKETMQ_HOME/conf/2m-2s-sync

properties
# 所属集群名字
brokerClusterName=RocketMQ-Cluster
# Broker名字,同一组主从相同 brokerName 相同的多个服务会有⼀套相同的数据副本
brokerName=broker-a
# 0 表示 Master,>0 表示 Slave
# master节点需要固定配置为0,负责响应客户端的请求。slave节点配置成其他任意数字,负责备份master上的消息。
brokerId=0
# NameServer 地址列表,nameserver服务默认占⽤9876端⼝,多个用分号分隔
namesrvAddr=node1:9876;node2:9876;node3:9876
# 监听端口
listenPort=10911
# 存储路径
storePathRootDir=/home/rocketmq/store-a-m-1
# CommitLog 存储路径
storePathCommitLog=/home/rocketmq/store-a-m-1/commitlog
# 消费队列存储路径
storePathConsumeQueue=/home/rocketmq/store-a-m-1/consumequeue
# 消息索引存储路径
storePathIndex=/home/rocketmq/store-a-m-1/index
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=48
# Broker 角色:SYNC_MASTER(同步双写 Master)
# 三个可选项:ASYNC_MASTER,SYNC_MASTER 和 SLAVE。其中,ASYNC_MASTER 和 SYNC_MASTER 表示当前节点是 master 节点,SLAVE则表示 slave 节点。
brokerRole=SYNC_MASTER
# 刷盘方式:ASYNC_FLUSH(异步刷盘)或 SYNC_FLUSH(同步刷盘)
flushDiskType=ASYNC_FLUSH
# 本机IP地址,如果服务器有多个网卡,需要指定
# brokerIP1=node4公网/内网IP

在 node6 服务器上实现 broker_a_slave_1 服务,新建或修改 broker_a_slave_1.properties 配置文件。配置文件建议路径 $ROCKETMQ_HOME/conf/2m-2s-sync

properties
# 所属集群名字
brokerClusterName=RocketMQ-Cluster
# Broker名字,同一组主从相同 brokerName 相同的多个服务会有⼀套相同的数据副本
brokerName=broker-a
# 0 表示 Master,>0 表示 Slave
# master节点需要固定配置为0,负责响应客户端的请求。slave节点配置成其他任意数字,负责备份master上的消息。
brokerId=1
# NameServer 地址列表,nameserver服务默认占⽤9876端⼝,多个用分号分隔
namesrvAddr=node1:9876;node2:9876;node3:9876
# 监听端口
listenPort=10911
# 存储路径
storePathRootDir=/home/rocketmq/store-a-s-1
# CommitLog 存储路径
storePathCommitLog=/home/rocketmq/store-a-s-1/commitlog
# 消费队列存储路径
storePathConsumeQueue=/home/rocketmq/store-a-s-1/consumequeue
# 消息索引存储路径
storePathIndex=/home/rocketmq/store-a-s-1/index
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=48
# Broker 角色:SYNC_MASTER(同步双写 Master)
# 三个可选项:ASYNC_MASTER,SYNC_MASTER 和 SLAVE。其中,ASYNC_MASTER 和 SYNC_MASTER 表示当前节点是 master 节点,SLAVE则表示 slave 节点。
brokerRole=SLAVE
# 刷盘方式:ASYNC_FLUSH(异步刷盘)或 SYNC_FLUSH(同步刷盘)
flushDiskType=ASYNC_FLUSH
# 本机IP地址,如果服务器有多个网卡,需要指定
# brokerIP1=node4公网/内网IP

4. 节点部署

部署 NameServer 服务

NameServer 服务不需要做特别的配置,在 node1,node2,node3 三台服务器上分别直接进行部署和验证即可。

NameServer 服务部署
shell
# 进入 bin 目录
cd $ROCKETMQ_HOME/bin

# 启动 NameServer (后台运行)
# nohup 表示忽略挂起信号,& 表示后台运行
nohup sh mqnamesrv &
验证 NameServer 是否启动成功
shell
# 查看进程
jps -l | grep NamesrvStartup
# 或者查看日志 (日志默认在 ~/logs/rocketmqlogs/namesrv.log)
# 看到 `The Name Server boot success...` 字样即表示成功。
tail -f ~/logs/rocketmqlogs/namesrv.log

部署 Broker 服务

在 node4,node5,node6,node7 四台服务器上分别进行部署和验证。

node4 启动 broker-a-master-1 服务
shell
# 切换到 RocketMQ 安装目录下的 bin 目录,以便直接使用 mqbroker 命令
cd $ROCKETMQ_HOME/bin

# 使用 nohup 将 Broker 进程挂载到后台运行,避免因终端关闭而退出
# -c 选项指定 Broker 的配置文件路径(此处为相对路径,基于当前所在目录)
# 配置文件为两主两从同步双写模式下的 broker_a 组 Master 节点配置
# 最后的 & 符号表示将命令放入后台执行,立即返回 Shell 提示符
nohup mqbroker -c ../conf/2m-2s-sync/broker_a_master_1.properties &
node5 启动 broker-a-slave-1 服务
shell
# 切换到 RocketMQ 安装目录下的 bin 目录,以便直接使用 mqbroker 命令
cd $ROCKETMQ_HOME/bin

# 使用 nohup 将 Broker 进程挂载到后台运行,避免因终端关闭而退出
# -c 选项指定 Broker 的配置文件路径(此处为相对路径,基于当前所在目录)
# 配置文件为两主两从同步双写模式下的 broker_a 组 Slave 节点配置
# 最后的 & 符号表示将命令放入后台执行,立即返回 Shell 提示符
nohup mqbroker -c ../conf/2m-2s-sync/broker_a_slave_1.properties &
node6 启动 broker-b-master-1 服务
shell
# 切换到 RocketMQ 安装目录下的 bin 目录,以便直接使用 mqbroker 命令
cd $ROCKETMQ_HOME/bin

# 使用 nohup 将 Broker 进程挂载到后台运行,避免因终端关闭而退出
# -c 选项指定 Broker 的配置文件路径(此处为相对路径,基于当前所在目录)
# 配置文件为两主两从同步双写模式下的 broker_b 组 Master 节点配置
# 最后的 & 符号表示将命令放入后台执行,立即返回 Shell 提示符
nohup mqbroker -c ../conf/2m-2s-sync/broker_b_master_1.properties &
node7 启动 broker-b-slave-1 服务
shell
# 切换到 RocketMQ 安装目录下的 bin 目录,以便直接使用 mqbroker 命令
cd $ROCKETMQ_HOME/bin

# 使用 nohup 将 Broker 进程挂载到后台运行,避免因终端关闭而退出
# -c 选项指定 Broker 的配置文件路径(此处为相对路径,基于当前所在目录)
# 配置文件为两主两从同步双写模式下的 broker_b 组 Slave 节点配置
# 最后的 & 符号表示将命令放入后台执行,立即返回 Shell 提示符
nohup mqbroker -c ../conf/2m-2s-sync/broker_b_slave_1.properties &

5. 部署结果查看

1. 可视化管理服务预备

参照RocketMQ可视化管理服务搭建进行安装并部署

2. 核心配置 连接 NameServer 集群

在 Jar 包的同级目录下创建 application.properties 文件,写入以下配置,这样每次启动时配置会自动加载。

properties
# application.properties
# 指定NameServer地址,多个地址用分号隔开
rocketmq.config.namesrvAddr=node1:9876;node2:9876;node3:9876
# 可选:指定数据文件存储路径
# rocketmq.config.dataPath=/tmp/rocketmq-console/data

3. 启动 Dashboard

使用 java -jar 命令启动,并通过命令行参数指定 NameServer 地址。如果已配置 application.properties 文件,则无需再加此参数

shell
# 启动命令,将 jar 包名替换为你实际的文件名

# 方式A:如果已配置 application.properties,直接启动即可
nohup java -jar rocketmq-dashboard-2.0.0.jar > dashboard.log 2>&1 &

# 方式B:如果没有配置文件,直接通过参数指定
# nohup java -jar rocketmq-dashboard-2.0.0.jar --rocketmq.config.namesrvAddr="node1:9876;node2:9876;node3:9876" > dashboard.log 2>&1 &

nameserver集群部署结果

broker集群部署结果