RocketMQ 是阿里巴巴开源的一款 分布式消息中间件,其设计目标是处理高吞吐、高可靠、低延迟的消息传递,尤其适用于:
消息队列
分布式事务
高并发异步处理
日志收集、监控
RocketMQ 核心概念

Producer 发送消息到 Broker,消息根据 Topic 和 Queue 存储在 Broker 中。
Consumer 从 Broker 中消费消息,支持 Push 模式(主动拉取消息)和 Pull 模式(消费者定时拉取消息)。
NameServer 提供 Broker 路由信息,Producer 和 Consumer 通过 NameServer 获取到具体的 Broker 地址。
Broker 支持 集群部署,分布式存储消息,保证高可用性。
Producer 向 Broker 发送消息
通过 NameServer 查询到 Broker 地址
发送消息到 Broker
Broker 存储消息
按 Topic、Queue 存储
支持消息持久化到磁盘Copy to clipboardErrorCopiedConsumer 消费消息
通过 NameServer 获取 Broker 路由信息
消费队列中的消息,消费者可以根据消息顺序和消息的消费模式来决定消费顺序topic和队列关系
一对多关系:一个Topic可以包含多个Queue,但一个Queue只能属于一个Topic。
消息写入:生产者发送消息到Topic,Broker根据配置将消息写入到Topic下的某个Queue。
消息读取:消费者订阅Topic,从Topic下的多个Queue中拉取消息进行消费。
一个分组下的消费者,只能监听一个队列,一个队列中的一个消息只能被一个分组下的一个消费者消费
创建主题,设置队列数量
一般几个消费者,就创建几个队列
命令创建
mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t your_topic_name -h 2 -n 4-n 后面跟的是 NameServer 的地址。
-c 后面跟的是集群名称(如果你的集群不是默认的,需要指定)。
-t 后面跟的是要更新的主题名称。
-h 表示主题的消息存储策略,通常设置为2表示分散存储。
-n 表示队列数量。
代码创建
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class CreateTopic {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876"); // 设置NameServer地址
producer.start();
// 创建TopicConfig实例并设置属性
TopicConfig topicConfig = new TopicConfig("your_topic_name");
topicConfig.setReadQueueNums(4); // 设置队列数量
topicConfig.setWriteQueueNums(4); // 设置写队列数量,通常与读队列数量相同
topicConfig.setPerm(org.apache.rocketmq.common.TopicConfig.Perm.PERM_READ | org.apache.rocketmq.common.TopicConfig.Perm.PERM_WRITE); // 设置权限
topicConfig.setTopicFilterType(org.apache.rocketmq.common.TopicFilterType.TAG_FILTER); // 设置过滤类型,默认为TAG过滤
// 创建或更新主题
producer.getDefaultMQProducerImpl().getmQAdminImpl().createTopicInNameServer(topicConfig);
producer.shutdown();
}
}
RocketMQ 消息模型
点对点(P2P)模式
一个消息队列只有一个消费者消费。 多个生产者 可以向同一个 Topic 发送消息,而 消费者 只会从一个队列中消费消息。Copy to clipboardErrorCopied发布/订阅(Pub/Sub)模式
一个 Topic 对应多个 消费者,消费者并行消费不同队列的消息。
每个消费者都接收到同一个消息副本。Copy to clipboardErrorCopied模型特点
1、消息分布:在一个主题下的所有队列之间,消息会尽量均匀地分布,以达到负载均衡的效果。
2、消费模型:消费者可以选择集群模式或广播模式消费消息。
集群模式:默认的消费方式,一组消费者组成一个消费组,共同消费同一主题下的所有队列。每个队列只会被组内的一个消费者实例消费,保证了消息不会被重复消费。
广播模式:每个消费者都会收到该主题下的所有消息,适用于需要所有消费者都处理每条消息的场景。Copy to clipboardErrorCopied3、顺序消息:如果需要严格的消息顺序,那么必须将相关的消息发送到相同的队列,并且这个队列只能被单个消费者线程消费
RocketMQ 消息传递的可靠性保障
消息持久化
默认启用,确保消息存储到磁盘,避免丢失。
支持 同步刷盘、异步刷盘 两种方式配置,控制持久化的性能与可靠性。Copy to clipboardErrorCopied消息的幂等性
消息消费者要确保幂等性,防止多次消费重复消息。
RocketMQ 提供消息 唯一标识符,可以在消费者端处理幂等性问题。Copy to clipboardErrorCopied消息的事务性
支持 RocketMQ 的事务消息,通过执行本地事务来确保分布式事务的可靠性。
事务消息:生产者发送消息前,会先执行本地事务,成功后提交消息,失败则回滚。RocketMQ 消息的顺序性保证
RocketMQ 可以保证 严格顺序消息,即同一 消息队列 内的消息会按顺序消费。
可以设置 顺序消费 来保证处理顺序。
注意:
顺序消费 只能保证同一队列内的顺序,多个队列并发消费时消息的顺序无法保证。
RocketMQ 消息消费模式
Push 模式
消息队列推送消息到消费者,消费者通过 回调函数 进行消息处理。
优点:实时性强,及时消费。
缺点:可能面临消息积压或消息延迟。Copy to clipboardErrorCopiedPull 模式
消费者主动拉取消息。
优点:控制消费速率,避免消息过多导致资源溢出。
缺点:可能存在延迟。RocketMQ 的事务消息
RocketMQ 提供了事务消息功能,可以用于分布式事务的处理。
消息发送:通过事务消息发送消息。
事务执行:执行本地事务逻辑,返回事务执行结果。
消息提交或回滚:根据事务执行结果,决定提交消息还是回滚。
示例:
TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);RocketMQ 优势与应用场景
高吞吐、高可靠
通过分布式架构和高效的消息存储与消费机制,RocketMQ 在大规模应用场景下表现出色。
支持分布式事务
RocketMQ 提供了完整的事务消息支持,可以保证多系统、分布式环境下的事务一致性。
灵活的消费模式
提供了 Push 和 Pull 两种消费模式,适应不同的应用场景。
应用场景
电商订单处理:确保订单的可靠性、幂等性。
日志收集:异步化日志的存储与处理。
异步通知:消息通知、系统集成。
分布式事务:确保各个服务的数据一致性。
RocketMQ 的集群与高可用配置
集群部署
RocketMQ 支持 Broker 集群 模式,可以水平扩展。
支持 NameServer 集群,保证 高可用 和 负载均衡。
消息复制
在多个 Broker 之间进行消息的 主从复制,保证消息不丢失。
负载均衡
支持 消息队列分区 和 多消费者并发消费,通过负载均衡实现高效的消息传递。
RocketMQ vs Kafka 比较

常见面试问题

总结
RocketMQ = 高性能、高可靠的分布式消息队列,支持事务、顺序、异步处理,广泛应用于金融、电商、日志收集等领域。