RocketMQ简介

吴书松
吴书松
发布于 2025-05-08 / 10 阅读
0

RocketMQ简介

RocketMQ 是阿里巴巴开源的一款 分布式消息中间件,其设计目标是处理高吞吐、高可靠、低延迟的消息传递,尤其适用于:

  • 消息队列

  • 分布式事务

  • 高并发异步处理

  • 日志收集、监控

RocketMQ 核心概念

  • Producer 发送消息到 Broker,消息根据 Topic 和 Queue 存储在 Broker 中。

  • Consumer 从 Broker 中消费消息,支持 Push 模式(主动拉取消息)和 Pull 模式(消费者定时拉取消息)。

  • NameServer 提供 Broker 路由信息,Producer 和 Consumer 通过 NameServer 获取到具体的 Broker 地址。

  • Broker 支持 集群部署,分布式存储消息,保证高可用性。

Producer 向 Broker 发送消息

通过 NameServer 查询到 Broker 地址
发送消息到 Broker

Broker 存储消息

按 Topic、Queue 存储
支持消息持久化到磁盘Copy to clipboardErrorCopied

Consumer 消费消息

通过 NameServer 获取 Broker 路由信息
消费队列中的消息,消费者可以根据消息顺序和消息的消费模式来决定消费顺序

topic和队列关系

  • 一对多关系:一个Topic可以包含多个Queue,但一个Queue只能属于一个Topic。

  • 消息写入:生产者发送消息到Topic,Broker根据配置将消息写入到Topic下的某个Queue。

  • 消息读取:消费者订阅Topic,从Topic下的多个Queue中拉取消息进行消费。

  • 一个分组下的消费者,只能监听一个队列,一个队列中的一个消息只能被一个分组下的一个消费者消费

创建主题,设置队列数量

一般几个消费者,就创建几个队列

命令创建

mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t your_topic_name -h 2 -n 4

  • -n 后面跟的是 NameServer 的地址。

  • -c 后面跟的是集群名称(如果你的集群不是默认的,需要指定)。

  • -t 后面跟的是要更新的主题名称。

  • -h 表示主题的消息存储策略,通常设置为2表示分散存储。

  • -n 表示队列数量。

代码创建

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.common.RemotingHelper;
 
public class CreateTopic {
    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("localhost:9876"); // 设置NameServer地址
        producer.start();
 
        // 创建TopicConfig实例并设置属性
        TopicConfig topicConfig = new TopicConfig("your_topic_name");
        topicConfig.setReadQueueNums(4); // 设置队列数量
        topicConfig.setWriteQueueNums(4); // 设置写队列数量,通常与读队列数量相同
        topicConfig.setPerm(org.apache.rocketmq.common.TopicConfig.Perm.PERM_READ | org.apache.rocketmq.common.TopicConfig.Perm.PERM_WRITE); // 设置权限
        topicConfig.setTopicFilterType(org.apache.rocketmq.common.TopicFilterType.TAG_FILTER); // 设置过滤类型,默认为TAG过滤
 
        // 创建或更新主题
        producer.getDefaultMQProducerImpl().getmQAdminImpl().createTopicInNameServer(topicConfig);
 
        producer.shutdown();
    }
}

RocketMQ 消息模型

  • 点对点(P2P)模式

    一个消息队列只有一个消费者消费。
    多个生产者 可以向同一个 Topic 发送消息,而 消费者 只会从一个队列中消费消息。Copy to clipboardErrorCopied
  • 发布/订阅(Pub/Sub)模式

一个 Topic 对应多个 消费者,消费者并行消费不同队列的消息。
每个消费者都接收到同一个消息副本。Copy to clipboardErrorCopied
  • 模型特点

    1、消息分布:在一个主题下的所有队列之间,消息会尽量均匀地分布,以达到负载均衡的效果。

    2、消费模型:消费者可以选择集群模式或广播模式消费消息。

    集群模式:默认的消费方式,一组消费者组成一个消费组,共同消费同一主题下的所有队列。每个队列只会被组内的一个消费者实例消费,保证了消息不会被重复消费。
    广播模式:每个消费者都会收到该主题下的所有消息,适用于需要所有消费者都处理每条消息的场景。Copy to clipboardErrorCopied

3、顺序消息:如果需要严格的消息顺序,那么必须将相关的消息发送到相同的队列,并且这个队列只能被单个消费者线程消费

RocketMQ 消息传递的可靠性保障

  • 消息持久化

  默认启用,确保消息存储到磁盘,避免丢失。
  支持 同步刷盘、异步刷盘 两种方式配置,控制持久化的性能与可靠性。Copy to clipboardErrorCopied
  • 消息的幂等性

消息消费者要确保幂等性,防止多次消费重复消息。
RocketMQ 提供消息 唯一标识符,可以在消费者端处理幂等性问题。Copy to clipboardErrorCopied
  • 消息的事务性

支持 RocketMQ 的事务消息,通过执行本地事务来确保分布式事务的可靠性。
事务消息:生产者发送消息前,会先执行本地事务,成功后提交消息,失败则回滚。

RocketMQ 消息的顺序性保证

  • RocketMQ 可以保证 严格顺序消息,即同一 消息队列 内的消息会按顺序消费。

  • 可以设置 顺序消费 来保证处理顺序。

注意:

顺序消费 只能保证同一队列内的顺序,多个队列并发消费时消息的顺序无法保证。

RocketMQ 消息消费模式

  • Push 模式

消息队列推送消息到消费者,消费者通过 回调函数 进行消息处理。
优点:实时性强,及时消费。
缺点:可能面临消息积压或消息延迟。Copy to clipboardErrorCopied
  • Pull 模式

消费者主动拉取消息。
优点:控制消费速率,避免消息过多导致资源溢出。
缺点:可能存在延迟。

RocketMQ 的事务消息

RocketMQ 提供了事务消息功能,可以用于分布式事务的处理。

  • 消息发送:通过事务消息发送消息。

  • 事务执行:执行本地事务逻辑,返回事务执行结果。

  • 消息提交或回滚:根据事务执行结果,决定提交消息还是回滚。

示例:

TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);

RocketMQ 优势与应用场景

高吞吐、高可靠

通过分布式架构和高效的消息存储与消费机制,RocketMQ 在大规模应用场景下表现出色。

支持分布式事务

RocketMQ 提供了完整的事务消息支持,可以保证多系统、分布式环境下的事务一致性。

灵活的消费模式

提供了 Push 和 Pull 两种消费模式,适应不同的应用场景。

应用场景

电商订单处理:确保订单的可靠性、幂等性。

日志收集:异步化日志的存储与处理。

异步通知:消息通知、系统集成。

分布式事务:确保各个服务的数据一致性。

RocketMQ 的集群与高可用配置

集群部署

  • RocketMQ 支持 Broker 集群 模式,可以水平扩展。

  • 支持 NameServer 集群,保证 高可用 和 负载均衡。

消息复制

在多个 Broker 之间进行消息的 主从复制,保证消息不丢失。

负载均衡

支持 消息队列分区 和 多消费者并发消费,通过负载均衡实现高效的消息传递。

RocketMQ vs Kafka 比较

常见面试问题

总结

RocketMQ = 高性能、高可靠的分布式消息队列,支持事务、顺序、异步处理,广泛应用于金融、电商、日志收集等领域。