高级功能
消息存储
分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。
- 消息生成者发送消息
- MQ收到消息,将消息进行持久化,在存储中新增一条记录
- 返回ACK给生产者
- MQ push 消息给对应的消费者,然后等待消费者返回ACK
- 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
- MQ删除消息
存储介质
- 关系型数据库DB
Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障
文件系统
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。
性能对比
文件系统>关系型数据库DB
消息的存储和发送
消息存储
磁盘如果使用得当,磁盘的速度完全可以匹配上网络 的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。
消息发送
Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。
一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:
1)read;读取本地文件内容;
2)write;将读取的内容通过网络发送出去。
这两个看似简单的操作,实际进行了4 次数据复制,分别是:
- 从磁盘复制数据到内核态内存;
- 从内核态内存复 制到用户态内存;
- 然后从用户态 内存复制到网络驱动的内核态内存;
- 最后是从网络驱动的内核态内存复 制到网卡中进行传输。
通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现的
RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。
这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了
消息存储结构
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。
- CommitLog:存储消息的元数据
- ConsumerQueue:存储消息在CommitLog的索引
- IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
刷盘机制
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时 候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
同步刷盘
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。
异步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
配置
同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。
高可用性机制
RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的。
Master和Slave的区别:在Broker的配置文件中,参数 brokerId的值为0表明这个Broker是Master,大于0表明这个Broker是 Slave,同时brokerRole参数也会说明这个Broker是Master还是Slave。
Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker连接写入消息;Consumer可以连接 Master角色的Broker,也可以连接Slave角色的Broker来读取消息。
消息消费高可用
在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。
消息发送高可用
在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可 用后,其他组的Master仍然可用,Producer仍然可以发送消息。 RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足, 需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文 件,用新的配置文件启动Broker。
消息主从复制
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。
同步复制
同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态;
在同步复制方式下,如果Master出故障, Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入 延迟,降低系统吞吐量。
异步复制
异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失;
配置
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。
总结
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低 性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘 方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台 机器出故障,仍然能保证数据不丢,是个不错的选择。
负载均衡
Producer负载均衡
Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:
图中箭头线条上的标号代表顺序,发布方会把第一条消息发送至 Queue 0,然后第二条消息发送至 Queue 1,以此类推。
Consumer负载均衡
集群模式
在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
默认的分配算法是AllocateMessageQueueAveragely,如下图:
还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式,如下图:
需要注意的是,集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。
通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。
但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。
广播模式
由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。
在实现上,其中一个不同就是在consumer分配queue的时候,所有consumer都分到所有的queue。
消息重试
顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
重试次数
消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:
第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分钟 |
2 | 30 秒 | 10 | 8 分钟 |
3 | 1 分钟 | 11 | 9 分钟 |
4 | 2 分钟 | 12 | 10 分钟 |
5 | 3 分钟 | 13 | 20 分钟 |
6 | 4 分钟 | 14 | 30 分钟 |
7 | 5 分钟 | 15 | 1 小时 |
8 | 6 分钟 | 16 | 2 小时 |
如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。
配置方式
消费失败后,重试配置方式
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
- 返回 Action.ReconsumeLater (推荐)
- 返回 Null
- 抛出异常
1 | public class MessageListenerImpl implements MessageListener { |
消费失败后,不重试配置方式
集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回 Action.CommitMessage,此后这条消息将不会再重试。
1 | public class MessageListenerImpl implements MessageListener { |
自定义消息最大重试次数
消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:
- 最大重试次数小于等于 16 次,则重试时间间隔同上表描述。
- 最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。
1 | Properties properties = new Properties(); |
注意:
- 消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。
- 如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。
- 配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置
获取消息重试次数
消费者收到消息后,可按照如下方式获取消息的重试次数:
1 | public class MessageListenerImpl implements MessageListener { |
死信队列
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
死信特性
死信消息具有以下特性
- 不会再被消费者正常消费。
- 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。
死信队列具有以下特性:
- 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
- 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
- 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
查看死信信息
- 在控制台查询出现死信队列的主题信息
- 在消息界面根据主题查询死信消息
- 选择重新发送消息
一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。
消费幂等
消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。
消费幂等的必要性
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
处理方式
因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置:
1 | Message message = new Message(); |
订阅方收到消息时可以根据消息的 Key 进行幂等处理:
1 | consumer.subscribe("ons_test", "*", new MessageListener() { |
源码分析
环境搭建
依赖工具
- JDK :1.8+
- Maven
- IntelliJ IDEA
源码拉取
从官方仓库 https://github.com/apache/rocketmq clone
或者download
源码。
源码目录结构:
broker: broker 模块(broke 启动进程)
client :消息客户端,包含消息生产者、消息消费者相关类
- common :公共包
- dev :开发者信息(非源代码)
- distribution :部署实例文件夹(非源代码)
- example: RocketMQ 例代码
filter :消息过滤相关基础类
filtersrv:消息过滤服务器实现相关类(Filter启动进程)
- logappender:日志实现相关类
- namesrv:NameServer实现相关类(NameServer启动进程)
- openmessageing:消息开放标准
- remoting:远程通信模块,给予Netty
- srcutil:服务工具类
- store:消息存储实现相关类
- style:checkstyle相关实现
- test:测试相关类
- tools:工具类,监控命令相关实现类
导入IDEA
执行安装
1 | clean install -Dmaven.test.skip=true |
调试
创建conf
配置文件夹,从distribution
拷贝broker.conf
和logback_broker.xml
和logback_namesrv.xml
启动NameServer
- 展开namesrv模块,右键NamesrvStartup.java
- 配置ROCKETMQ_HOME
重新启动
控制台打印结果
1 | The Name Server boot success. serializeType=JSON |
启动Broker
broker.conf
配置文件内容
1 | brokerClusterName = DefaultCluster |
- 创建数据文件夹
dataDir
- 启动
BrokerStartup
,配置broker.conf
和ROCKETMQ_HOME
发送消息
- 进入example模块的
org.apache.rocketmq.example.quickstart
- 指定Namesrv地址
1 | DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); |
- 运行
main
方法,发送消息
消费消息
- 进入example模块的
org.apache.rocketmq.example.quickstart
- 指定Namesrv地址
1 | DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); |
- 运行
main
方法,消费消息
NameServer
架构设计
消息中间件的设计思路一般是基于主题订阅发布的机制,消息生产者(Producer)发送某一个主题到消息服务器,消息服务器负责将消息持久化存储,消息消费者(Consumer)订阅该兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送到消费者(Push模式)或者消费者主动向消息服务器拉去(Pull模式),从而实现消息生产者与消息消费者解耦。为了避免消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。那消息生产者如何知道消息要发送到哪台消息服务器呢?如果某一台消息服务器宕机了,那么消息生产者如何在不重启服务情况下感知呢?
NameServer就是为了解决以上问题设计的。
Broker消息服务器在启动的时向所有NameServer注册,消息生产者(Producer)在发送消息时之前先从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一台服务器进行发送。NameServer与每台Broker保持长连接,并间隔30S检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中删除。但是路由变化不会马上通知消息生产者。这样设计的目的是为了降低NameServer实现的复杂度,在消息发送端提供容错机制保证消息发送的可用性。
NameServer本身的高可用是通过部署多台NameServer来实现,但彼此之间不通讯,也就是NameServer服务器之间在某一个时刻的数据并不完全相同,但这对消息发送并不会造成任何影响,这也是NameServer设计的一个亮点,总之,RocketMQ设计追求简单高效。
启动流程
启动类:org.apache.rocketmq.namesrv.NamesrvStartup
步骤一
解析配置文件,填充NameServerConfig、NettyServerConfig属性值,并创建NamesrvController
代码:NamesrvController#createNamesrvController
1 | //创建NamesrvConfig |
NamesrvConfig属性
1 | private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); |
rocketmqHome:rocketmq主目录
kvConfig:NameServer存储KV配置属性的持久化路径
configStorePath:nameServer默认配置文件路径
orderMessageEnable:是否支持顺序消息
NettyServerConfig属性
1 | private int listenPort = 8888; |
listenPort:NameServer监听端口,该值默认会被初始化为9876
serverWorkerThreads:Netty业务线程池线程个数
serverCallbackExecutorThreads:Netty public任务线程池线程个数,Netty网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型未注册线程池,则由public线程池执行。
serverSelectorThreads:IO线程池个数,主要是NameServer、Broker端解析请求、返回相应的线程个数,这类线程主要是处理网路请求的,解析请求包,然后转发到各个业务线程池完成具体的操作,然后将结果返回给调用方;
serverOnewaySemaphoreValue:send oneway消息请求并发读(Broker端参数);
serverAsyncSemaphoreValue:异步消息发送最大并发度;
serverChannelMaxIdleTimeSeconds :网络连接最大的空闲时间,默认120s。
serverSocketSndBufSize:网络socket发送缓冲区大小。
serverSocketRcvBufSize: 网络接收端缓存区大小。
serverPooledByteBufAllocatorEnable:ByteBuffer是否开启缓存;
useEpollNativeSelector:是否启用Epoll IO模型。
步骤二
根据启动属性创建NamesrvController实例,并初始化该实例。NameServerController实例为NameServer核心控制器
代码:NamesrvController#initialize
1 | public boolean initialize() { |
步骤三
在JVM进程关闭之前,先将线程池关闭,及时释放资源
代码:NamesrvStartup#start
1 | //注册JVM钩子函数代码 |
路由管理
NameServer的主要作用是为消息的生产者和消息消费者提供关于主题Topic的路由信息,那么NameServer需要存储路由的基础信息,还要管理Broker节点,包括路由注册、路由删除等。
路由元信息
代码:RouteInfoManager
1 | private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; |
topicQueueTable:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡
brokerAddrTable:Broker基础信息,包括brokerName、所属集群名称、主备Broker地址
clusterAddrTable:Broker集群信息,存储集群中所有Broker名称
brokerLiveTable:Broker状态信息,NameServer每次收到心跳包是会替换该信息
filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤。
RocketMQ基于定于发布机制,一个Topic拥有多个消息队列,一个Broker为每一个主题创建4个读队列和4个写队列。多个Broker组成一个集群,集群由相同的多台Broker组成Master-Slave架构,brokerId为0代表Master,大于0为Slave。BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。
路由注册
发送心跳包
RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。Broker启动时向集群中所有的NameServer发送心跳信息,每隔30s向集群中所有NameServer发送心跳包,NameServer收到心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdataTimeStamp信息,然后NameServer每隔10s扫描brokerLiveTable,如果连续120S没有收到心跳包,NameServer将移除Broker的路由信息同时关闭Socket连接。
代码:BrokerController#start
1 | //注册Broker信息 |
代码:BrokerOuterAPI#registerBrokerAll
1 | //获得nameServer地址信息 |
代码:BrokerOutAPI#registerBroker
1 | if (oneway) { |
处理心跳包
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor
网路处理类解析请求类型,如果请求类型是为REGISTER_BROKER,则将请求转发到RouteInfoManager#regiesterBroker
代码:DefaultRequestProcessor#processRequest
1 | //判断是注册Broker信息 |
代码:DefaultRequestProcessor#registerBroker
1 | RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( |
代码:RouteInfoManager#registerBroker
维护路由信息
1 | //加锁 |
1 | //维护brokerAddrTable |
1 | //维护topicQueueTable |
代码:RouteInfoManager#createAndUpdateQueueData
1 | private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) { |
1 | //维护brokerLiveTable |
1 | //维护filterServerList |
路由删除
1 |
|
代码:RouteInfoManager#scanNotActiveBroker
1 | public void scanNotActiveBroker() { |
代码:RouteInfoManager#onChannelDestroy
1 | //申请写锁,根据brokerAddress从brokerLiveTable和filterServerTable移除 |
1 | //维护brokerAddrTable |
1 | //维护clusterAddrTable |
1 | //维护topicQueueTable队列 |
1 | //释放写锁 |
路由发现
RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。
代码:DefaultRequestProcessor#getRouteInfoByTopic
1 | public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, |
小结
Producer
消息生产者的代码都在client模块中,相对于RocketMQ来讲,消息生产者就是客户端,也是消息的提供者。
方法和属性
主要方法介绍
1 | //创建主题 |
1 | //启动 |
属性介绍
1 | producerGroup:生产者所属组 |
启动流程
代码:DefaultMQProducerImpl#start
1 | //检查生产者组是否满足要求 |
整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表
ConcurrentMap<String/ clientId /, MQClientInstance> factoryTable = new ConcurrentHashMap<String,MQClientInstance>();
同一个clientId只会创建一个MQClientInstance。
MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
代码:MQClientManager#getAndCreateMQClientInstance
1 | public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, |
代码:DefaultMQProducerImpl#start
1 | //注册当前生产者到到MQClientInstance管理中,方便后续调用网路请求 |
消息发送
代码:DefaultMQProducerImpl#send(Message msg)
1 | //发送消息 |
代码:DefaultMQProducerImpl#send(Message msg,long timeout)
1 | //发送消息,默认超时时间为3s |
代码:DefaultMQProducerImpl#sendDefaultImpl
1 | //校验消息 |
验证消息
代码:Validators#checkMessage
1 | public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) |
查找路由
代码:DefaultMQProducerImpl#tryToFindTopicPublishInfo
1 | private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { |
代码:TopicPublishInfo
1 | public class TopicPublishInfo { |
代码:MQClientInstance#updateTopicRouteInfoFromNameServer
1 | TopicRouteData topicRouteData; |
代码:MQClientInstance#updateTopicRouteInfoFromNameServer
1 | //判断路由是否需要更改 |
代码:MQClientInstance#updateTopicRouteInfoFromNameServer
1 | if (changed) { |
代码:MQClientInstance#topicRouteData2TopicPublishInfo
1 | public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) { |
选择队列
- 默认不启用Broker故障延迟机制
代码:TopicPublishInfo#selectOneMessageQueue(lastBrokerName)
1 | public MessageQueue selectOneMessageQueue(final String lastBrokerName) { |
代码:TopicPublishInfo#selectOneMessageQueue()
1 | //第一次选择队列 |
- 启用Broker故障延迟机制
1 | public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { |
- 延迟机制接口规范
1 | public interface LatencyFaultTolerance<T> { |
- FaultItem:失败条目
1 | class FaultItem implements Comparable<FaultItem> { |
- 消息失败策略
1 | public class MQFaultStrategy { |
原理分析
代码:DefaultMQProducerImpl#sendDefaultImpl
1 | sendResult = this.sendKernelImpl(msg, |
如果上述发送过程出现异常,则调用DefaultMQProducerImpl#updateFaultItem
1 | public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { |
代码:MQFaultStrategy#updateFaultItem
1 | public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { |
代码:MQFaultStrategy#computeNotAvailableDuration
1 | private long computeNotAvailableDuration(final long currentLatency) { |
代码:LatencyFaultToleranceImpl#updateFaultItem
1 | public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { |
发送消息
消息发送API核心入口DefaultMQProducerImpl#sendKernelImpl
1 | private SendResult sendKernelImpl( |
代码:DefaultMQProducerImpl#sendKernelImpl
1 | //获得broker网络地址信息 |
1 | //为消息分类唯一ID |
1 | //如果注册了消息发送钩子函数,在执行消息发送前的增强逻辑 |
代码:SendMessageHook
1 | public interface SendMessageHook { |
代码:DefaultMQProducerImpl#sendKernelImpl
1 | //构建消息发送请求包 |
1 | case ASYNC: //异步发送 |
1 | //如果注册了钩子函数,则发送完毕后执行钩子函数 |
批量消息发送
批量消息发送是将同一个主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数量越多越好,其判断依据是单条消息的长度,如果单条消息内容比较长,则打包多条消息发送会影响其他线程发送消息的响应时间,并且单批次消息总长度不能超过DefaultMQProducer#maxMessageSize。
批量消息发送要解决的问题是如何将这些消息编码以便服务端能够正确解码出每条消息的消息内容。
代码:DefaultMQProducer#send
1 | public SendResult send(Collection<Message> msgs) |
代码:DefaultMQProducer#batch
1 | private MessageBatch batch(Collection<Message> msgs) throws MQClientException { |
消息存储
消息存储核心类
1 | private final MessageStoreConfig messageStoreConfig; //消息配置属性 |
消息存储流程
消息存储入口:DefaultMessageStore#putMessage
1 | //判断Broker角色如果是从节点,则无需写入 |
代码:CommitLog#putMessage
1 | //记录消息存储时间 |
代码:MappedFile#appendMessagesInner
1 | //获得文件的写入指针 |
代码:CommitLog#doAppend
1 | //文件写入位置 |
代码:CommitLog#calMsgLength
1 | protected static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) { |
代码:CommitLog#doAppend
1 | //消息长度不能超过4M |
代码:CommitLog#putMessage
1 | //释放锁 |
存储文件
- commitLog:消息存储目录
- config:运行期间一些配置信息
- consumerqueue:消息消费队列存储目录
- index:消息索引文件存储目录
- abort:如果存在改文件寿命Broker非正常关闭
- checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerquueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。
存储文件内存映射
RocketMQ通过使用内存映射文件提高IO访问性能,无论是CommitLog、ConsumerQueue还是IndexFile,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。
MappedFileQueue
1 | String storePath; //存储目录 |
- 根据存储时间查询MappedFile
1 | public MappedFile getMappedFileByTime(final long timestamp) { |
- 根据消息偏移量offset查找MappedFile
1 | public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { |
- 获取存储文件最小偏移量
1 | public long getMinOffset() { |
- 获取存储文件最大偏移量
1 | public long getMaxOffset() { |
- 返回存储文件当前写指针
1 | public long getMaxWrotePosition() { |
MappedFile
1 | int OS_PAGE_SIZE = 1024 * 4; //操作系统每页大小,默认4K |
MappedFile初始化
- 未开启
transientStorePoolEnable
。transientStorePoolEnable=true
为true
表示数据先存储到堆外内存,然后通过Commit
线程将数据提交到内存映射Buffer中,再通过Flush
线程将内存映射Buffer
中数据持久化磁盘。
1 | private void init(final String fileName, final int fileSize) throws IOException { |
开启transientStorePoolEnable
1 | public void init(final String fileName, final int fileSize, |
MappedFile提交
提交数据到FileChannel,commitLeastPages为本次提交最小的页数,如果待提交数据不满commitLeastPages,则不执行本次提交操作。如果writeBuffer如果为空,直接返回writePosition指针,无需执行commit操作,表名commit操作主体是writeBuffer。
1 | public int commit(final int commitLeastPages) { |
MappedFile#isAbleToCommit
判断是否执行commit操作,如果文件已满返回true;如果commitLeastpages大于0,则比较writePosition与上一次提交的指针commitPosition的差值,除以OS_PAGE_SIZE得到当前脏页的数量,如果大于commitLeastPages则返回true,如果commitLeastpages小于0表示只要存在脏页就提交。
1 | protected boolean isAbleToCommit(final int commitLeastPages) { |
MappedFile#commit0
具体提交的实现,首先创建WriteBuffer区共享缓存区,然后将新创建的position回退到上一次提交的位置(commitPosition),设置limit为wrotePosition(当前最大有效数据指针),然后把commitPosition到wrotePosition的数据写入到FileChannel中,然后更新committedPosition指针为wrotePosition。commit的作用就是将MappedFile的writeBuffer中数据提交到文件通道FileChannel中。
1 | protected void commit0(final int commitLeastPages) { |
MappedFile#flush
刷写磁盘,直接调用MappedByteBuffer或fileChannel的force方法将内存中的数据持久化到磁盘,那么flushedPosition应该等于MappedByteBuffer中的写指针;如果writeBuffer不为空,则flushPosition应该等于上一次的commit指针;因为上一次提交的数据就是进入到MappedByteBuffer中的数据;如果writeBuffer为空,数据时直接进入到MappedByteBuffer,wrotePosition代表的是MappedByteBuffer中的指针,故设置flushPosition为wrotePosition。
1 | public int flush(final int flushLeastPages) { |
MappedFile#getReadPosition
获取当前文件最大可读指针。如果writeBuffer为空,则直接返回当前的写指针;如果writeBuffer不为空,则返回上一次提交的指针。在MappedFile设置中,只有提交了的数据(写入到MappedByteBuffer或FileChannel中的数据)才是安全的数据
1 | public int getReadPosition() { |
MappedFile#selectMappedBuffer
查找pos到当前最大可读之间的数据,由于在整个写入期间都未曾改MappedByteBuffer的指针,如果mappedByteBuffer.slice()方法返回的共享缓存区空间为整个MappedFile,然后通过设置ByteBuffer的position为待查找的值,读取字节长度当前可读最大长度,最终返回的ByteBuffer的limit为size。整个共享缓存区的容量为(MappedFile#fileSize-pos)。故在操作SelectMappedBufferResult不能对包含在里面的ByteBuffer调用filp方法。
1 | public SelectMappedBufferResult selectMappedBuffer(int pos) { |
MappedFile#shutdown
MappedFile文件销毁的实现方法为public boolean destory(long intervalForcibly),intervalForcibly表示拒绝被销毁的最大存活时间。
1 | public void shutdown(final long intervalForcibly) { |
TransientStorePool
短暂的存储池。RocketMQ单独创建一个MappedByteBuffer内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。RocketMQ引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。
1 | private final int poolSize; //availableBuffers个数 |
初始化
1 | public void init() { |
实时更新消息消费队列与索引文件
消息消费队文件、消息属性索引文件都是基于CommitLog文件构建的,当消息生产者提交的消息存储在CommitLog文件中,ConsumerQueue、IndexFile需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。RocketMQ通过开启一个线程ReputMessageService来准实时转发CommitLog文件更新事件,相应的任务处理器根据转发的消息及时更新ConsumerQueue、IndexFile文件。
代码:DefaultMessageStore:start
1 | //设置CommitLog内存中最大偏移量 |
代码:DefaultMessageStore:run
1 | public void run() { |
代码:DefaultMessageStore:deReput
1 | //从result中循环遍历消息,一次读一条,创建DispatherRequest对象。 |
DispatchRequest
1 | String topic; //消息主题名称 |
转发到ConsumerQueue
1 | class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { |
代码:DefaultMessageStore#putMessagePositionInfo
1 | public void putMessagePositionInfo(DispatchRequest dispatchRequest) { |
代码:DefaultMessageStore#putMessagePositionInfo
1 | //依次将消息偏移量、消息长度、tag写入到ByteBuffer中 |
转发到Index
1 | class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { |
代码:DefaultMessageStore#buildIndex
1 | public void buildIndex(DispatchRequest req) { |
消息队列和索引文件恢复
由于RocketMQ存储首先将消息全量存储在CommitLog文件中,然后异步生成转发任务更新ConsumerQueue和Index文件。如果消息成功存储到CommitLog文件中,转发任务未成功执行,此时消息服务器Broker由于某个愿意宕机,导致CommitLog、ConsumerQueue、IndexFile文件数据不一致。如果不加以人工修复的话,会有一部分消息即便在CommitLog中文件中存在,但由于没有转发到ConsumerQueue,这部分消息将永远复发被消费者消费。
存储文件加载
代码:DefaultMessageStore#load
判断上一次是否异常退出。实现机制是Broker在启动时创建abort文件,在退出时通过JVM钩子函数删除abort文件。如果下次启动时存在abort文件。说明Broker时异常退出的,CommitLog与ConsumerQueue数据有可能不一致,需要进行修复。
1 | //判断临时文件是否存在 |
代码:DefaultMessageStore#load
1 | //加载延时队列 |
代码:MappedFileQueue#load
加载CommitLog到映射文件
1 | //指向CommitLog文件目录 |
代码:DefaultMessageStore#loadConsumeQueue
加载消息消费队列
1 | //执行消费队列目录 |
代码:IndexService#load
加载索引文件
1 | public boolean load(final boolean lastExitOK) { |
代码:DefaultMessageStore#recover
文件恢复,根据Broker是否正常退出执行不同的恢复策略
1 | private void recover(final boolean lastExitOK) { |
代码:DefaultMessageStore#recoverTopicQueueTable
恢复ConsumerQueue后,将在CommitLog实例中保存每隔消息队列当前的存储逻辑偏移量,这也是消息中不仅存储主题、消息队列ID、还存储了消息队列的关键所在。
1 | public void recoverTopicQueueTable() { |
正常恢复
代码:CommitLog#recoverNormally
1 | public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { |
代码:MappedFileQueue#truncateDirtyFiles
1 | public void truncateDirtyFiles(long offset) { |
异常恢复
Broker异常停止文件恢复的实现为CommitLog#recoverAbnormally。异常文件恢复步骤与正常停止文件恢复流程基本相同,其主要差别有两个。首先,正常停止默认从倒数第三个文件开始进行恢复,而异常停止则需要从最后一个文件往前走,找到第一个消息存储正常的文件。其次,如果CommitLog目录没有消息文件,如果消息消费队列目录下存在文件,则需要销毁。
代码:CommitLog#recoverAbnormally
1 | if (!mappedFiles.isEmpty()) { |
刷盘机制
RocketMQ的存储是基于JDK NIO的内存映射机制(MappedByteBuffer)的,消息存储首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。
同步刷盘
消息追加到内存后,立即将数据刷写到磁盘文件
代码:CommitLog#handleDiskFlush
1 | //刷盘服务 |
GroupCommitRequest
1 | long nextOffset; //刷盘点偏移量 |
代码:GroupCommitService#run
1 | public void run() { |
代码:GroupCommitService#doCommit
1 | private void doCommit() { |
异步刷盘
在消息追加到内存后,立即返回给消息发送端。如果开启transientStorePoolEnable,RocketMQ会单独申请一个与目标物理文件(commitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,然后刷写到磁盘。如果未开启transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,然后刷写到磁盘中。
开启transientStorePoolEnable后异步刷盘步骤:
- 将消息直接追加到ByteBuffer(堆外内存)
- CommitRealTimeService线程每隔200ms将ByteBuffer新追加内容提交到MappedByteBuffer中
- MappedByteBuffer在内存中追加提交的内容,wrotePosition指针向后移动
- commit操作成功返回,将committedPosition位置恢复
- FlushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内存刷写到磁盘
代码:CommitLog$CommitRealTimeService#run
提交线程工作机制
1 | //间隔时间,默认200ms |
代码:CommitLog$FlushRealTimeService#run
刷盘线程工作机制
1 | //表示await方法等待,默认false |
过期文件删除机制
由于RocketMQ操作CommitLog、ConsumerQueue文件是基于内存映射机制并在启动的时候回加载CommitLog、ConsumerQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以要引入一种机制来删除已过期的文件。RocketMQ顺序写CommitLog、ConsumerQueue文件,所有写操作全部落在最后一个CommitLog或者ConsumerQueue文件上,之前的文件在下一个文件创建后将不会再被更新。RocketMQ清除过期文件的方法时:如果当前文件在在一定时间间隔内没有再次被消费,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72小时,通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。
代码:DefaultMessageStore#addScheduleTask
1 | private void addScheduleTask() { |
代码:DefaultMessageStore#cleanFilesPeriodically
1 | private void cleanFilesPeriodically() { |
代码:DefaultMessageStore#deleteExpiredFiles
1 | private void deleteExpiredFiles() { |
删除文件操作的条件
- 指定删除文件的时间点,RocketMQ通过deleteWhen设置一天的固定时间执行一次删除过期文件操作,默认4点
- 磁盘空间如果不充足,删除过期文件
- 预留,手工触发。
代码:CleanCommitLogService#isSpaceToDelete
当磁盘空间不足时执行删除过期文件
1 | private boolean isSpaceToDelete() { |
代码:MappedFileQueue#deleteExpiredFileByTime
执行文件销毁和删除
1 | for (int i = 0; i < mfsLength; i++) { |
小结
RocketMQ的存储文件包括消息文件(Commitlog)、消息消费队列文件(ConsumerQueue)、Hash索引文件(IndexFile)、监测点文件(checkPoint)、abort(关闭异常文件)。单个消息存储文件、消息消费队列文件、Hash索引文件长度固定以便使用内存映射机制进行文件的读写操作。RocketMQ组织文件以文件的起始偏移量来命令文件,这样根据偏移量能快速定位到真实的物理文件。RocketMQ基于内存映射文件机制提供了同步刷盘和异步刷盘两种机制,异步刷盘是指在消息存储时先追加到内存映射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘。
CommitLog,消息存储文件,RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题消息,保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为此RocketMQ为了方便消息消费构建了消息消费队列文件,基于主题与队列进行组织,同时RocketMQ为消息实现了Hash索引,可以为消息设置索引键,根据所以能够快速从CommitLog文件中检索消息。
当消息达到CommitLog后,会通过ReputMessageService线程接近实时地将消息转发给消息消费队列文件与索引文件。为了安全起见,RocketMQ引入abort文件,记录Broker的停机是否是正常关闭还是异常关闭,在重启Broker时为了保证CommitLog文件,消息消费队列文件与Hash索引文件的正确性,分别采用不同策略来恢复文件。
RocketMQ不会永久存储消息文件、消息消费队列文件,而是启动文件过期机制并在磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费。
Consumer
消息消费概述
消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费者组可订阅多个主题,消费组之间有ff式和广播模式两种消费模式。集群模式,主题下的同一条消息只允许被其中一个消费者消费。广播模式,主题下的同一条消息,将被集群内的所有消费者消费一次。消息服务器与消费者之间的消息传递也有两种模式:推模式、拉模式。所谓的拉模式,是消费端主动拉起拉消息请求,而推模式是消息达到消息服务器后,推送给消息消费者。RocketMQ消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。
集群模式下,多个消费者如何对消息队列进行负载呢?消息队列负载机制遵循一个通用思想:一个消息队列同一个时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。
RocketMQ支持局部顺序消息消费,也就是保证同一个消息队列上的消息顺序消费。不支持消息全局顺序消费,如果要实现某一个主题的全局顺序消费,可以将该主题的队列数设置为1,牺牲高可用性。
消息消费初探
消息推送模式
消息消费重要方法
1 | void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName):发送消息确认 |
DefaultMQPushConsumer
1 | //消费者组 |
消费者启动流程
代码:DefaultMQPushConsumerImpl#start
1 | public synchronized void start() throws MQClientException { |
消息拉取
消息消费模式有两种模式:广播模式与集群模式。广播模式比较简单,每一个消费者需要拉取订阅主题下所有队列的消息。本文重点讲解集群模式。在集群模式下,同一个消费者组内有多个消息消费者,同一个主题存在多个消费队列,消费者通过负载均衡的方式消费消息。
消息队列负载均衡,通常的作法是一个消息队列在同一个时间只允许被一个消费消费者消费,一个消息消费者可以同时消费多个消息队列。
PullMessageService实现机制
从MQClientInstance的启动流程中可以看出,RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取。
代码:PullMessageService#run
1 | public void run() { |
PullRequest
1 | private String consumerGroup; //消费者组 |
代码:PullMessageService#pullMessage
1 | private void pullMessage(final PullRequest pullRequest) { |
####2)ProcessQueue实现机制
ProcessQueue是MessageQueue在消费端的重现、快照。PullMessageService从消息服务器默认每次拉取32条消息,按照消息的队列偏移量顺序存放在ProcessQueue中,PullMessageService然后将消息提交到消费者消费线程池,消息成功消费后从ProcessQueue中移除。
属性
1 | //消息容器 |
方法
1 | //移除消费超时消息 |
消息拉取基本流程
客户端发起拉取请求
代码:DefaultMQPushConsumerImpl#pullMessage
1 | public void pullMessage(final PullRequest pullRequest) { |
消息服务端Broker组装消息
代码:PullMessageProcessor#processRequest
1 | //构建消息过滤器 |
代码:DefaultMessageStore#getMessage
1 | GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; |
代码:PullMessageProcessor#processRequest
1 | //根据拉取结果填充responseHeader |
消息拉取客户端处理消息
代码:MQClientAPIImpl#processPullResponse
1 | private PullResult processPullResponse( |
PullResult类
1 | private final PullStatus pullStatus; //拉取结果 |
代码:DefaultMQPushConsumerImpl$PullCallback#OnSuccess
1 | //将拉取到的消息存入processQueue |
消息拉取总结
消息拉取长轮询机制分析
RocketMQ未真正实现消息推模式,而是消费者主动向消息服务器拉取消息,RocketMQ推模式是循环向消息服务端发起消息拉取请求,如果消息消费者向RocketMQ拉取消息时,消息未到达消费队列时,如果不启用长轮询机制,则会在服务端等待shortPollingTimeMills时间后(挂起)再去判断消息是否已经到达指定消息队列,如果消息仍未到达则提示拉取消息客户端PULL—NOT—FOUND(消息不存在);如果开启长轮询模式,RocketMQ一方面会每隔5s轮询检查一次消息是否可达,同时一有消息达到后立马通知挂起线程再次验证消息是否是自己感兴趣的消息,如果是则从CommitLog文件中提取消息返回给消息拉取客户端,否则直到挂起超时,超时时间由消息拉取方在消息拉取是封装在请求参数中,PUSH模式为15s,PULL模式通过DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis设置。RocketMQ通过在Broker客户端配置longPollingEnable为true来开启长轮询模式。
代码:PullMessageProcessor#processRequest
1 | //当没有拉取到消息时,通过长轮询方式继续拉取消息 |
PullRequestHoldService方式实现长轮询
代码:PullRequestHoldService#suspendPullRequest
1 | //将拉取消息请求,放置在ManyPullRequest集合中 |
代码:PullRequestHoldService#run
1 | public void run() { |
代码:PullRequestHoldService#checkHoldRequest
1 | //遍历拉取任务 |
代码:PullRequestHoldService#notifyMessageArriving
1 | //如果拉取消息偏移大于请求偏移量,如果消息匹配调用executeRequestWhenWakeup处理消息 |
如果开启了长轮询机制,PullRequestHoldService会每隔5s被唤醒去尝试检测是否有新的消息的到来才给客户端响应,或者直到超时才给客户端进行响应,消息实时性比较差,为了避免这种情况,RocketMQ引入另外一种机制:当消息到达时唤醒挂起线程触发一次检查。
DefaultMessageStore$ReputMessageService机制
代码:DefaultMessageStore#start
1 | //长轮询入口 |
代码:DefaultMessageStore$ReputMessageService#run
1 | public void run() { |
代码:DefaultMessageStore$ReputMessageService#deReput
1 | //当新消息达到是,进行通知监听器进行处理 |
代码:NotifyMessageArrivingListener#arriving
1 | public void arriving(String topic, int queueId, long logicOffset, long tagsCode, |
消息队列负载与重新分布机制
RocketMQ消息队列重新分配是由RebalanceService线程来实现。一个MQClientInstance持有一个RebalanceService实现,并随着MQClientInstance的启动而启动。
代码:RebalanceService#run
1 | public void run() { |
代码:MQClientInstance#doRebalance
1 | public void doRebalance() { |
代码:RebalanceImpl#doRebalance
1 | //遍历订阅消息对每个主题的订阅的队列进行重新负载 |
代码:RebalanceImpl#rebalanceByTopic
1 | //从主题订阅消息缓存表中获取主题的队列信息 |
RocketMQ默认提供5中负载均衡分配算法
1 | AllocateMessageQueueAveragely:平均分配 |
注意:消息队列的分配遵循一个消费者可以分配到多个队列,但同一个消息队列只会分配给一个消费者,故如果出现消费者个数大于消息队列数量,则有些消费者无法消费消息。
消息消费过程
PullMessageService负责对消息队列进行消息拉取,从远端服务器拉取消息后将消息存储ProcessQueue消息队列处理队列中,然后调用ConsumeMessageService#submitConsumeRequest方法进行消息消费,使用线程池来消费消息,确保了消息拉取与消息消费的解耦。ConsumeMessageService支持顺序消息和并发消息,核心类图如下:
并发消息消费
代码:ConsumeMessageConcurrentlyService#submitConsumeRequest
1 | //消息批次单次 |
代码:ConsumeMessageConcurrentlyService$ConsumeRequest#run
1 | //检查processQueue的dropped,如果为true,则停止该队列消费。 |
定时消息机制
定时消息是消息发送到Broker后,并不立即被消费者消费而是要等到特定的时间后才能被消费,RocketMQ并不支持任意的时间精度,如果要支持任意时间精度定时调度,不可避免地需要在Broker层做消息排序,再加上持久化方面的考量,将不可避免的带来巨大的性能消耗,所以RocketMQ只支持特定级别的延迟消息。消息延迟级别在Broker端通过messageDelayLevel配置,默认为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,delayLevel=1表示延迟消息1s,delayLevel=2表示延迟5s,依次类推。
RocketMQ定时消息实现类为ScheduleMessageService,该类在DefaultMessageStore中创建。通过在DefaultMessageStore中调用load方法加载该类并调用start方法启动。
代码:ScheduleMessageService#load
1 | //加载延迟消息消费进度的加载与delayLevelTable的构造。延迟消息的进度默认存储路径为/store/config/delayOffset.json |
代码:ScheduleMessageService#start
1 | //遍历延迟队列创建定时任务,遍历延迟级别,根据延迟级别level从offsetTable中获取消费队列的消费进度。如果不存在,则使用0 |
调度机制
ScheduleMessageService的start方法启动后,会为每一个延迟级别创建一个调度任务,每一个延迟级别对应SCHEDULE_TOPIC_XXXX主题下的一个消息消费队列。定时调度任务的实现类为DeliverDelayedMessageTimerTask,核心实现方法为executeOnTimeup
代码:ScheduleMessageService$DeliverDelayedMessageTimerTask#executeOnTimeup
1 | //根据队列ID与延迟主题查找消息消费队列 |
顺序消息
顺序消息实现类是org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService
代码:ConsumeMessageOrderlyService#start
1 | public void start() { |
代码:ConsumeMessageOrderlyService#submitConsumeRequest
1 | //构建消息任务,并提交消费线程池中 |
代码:ConsumeMessageOrderlyService$ConsumeRequest#run
1 | //如果消息队列为丢弃,则停止本次消费任务 |
小结
RocketMQ消息消费方式分别为集群模式、广播模式。
消息队列负载由RebalanceService线程默认每隔20s进行一次消息队列负载,根据当前消费者组内消费者个数与主题队列数量按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消费队列,同一个消息消费队列同一个时间只会分配给一个消费者。
消息拉取由PullMessageService线程根据RebalanceService线程创建的拉取任务进行拉取,默认每次拉取32条消息,提交给消费者消费线程后继续下一次消息拉取。如果消息消费过慢产生消息堆积会触发消息消费拉取流控。
并发消息消费指消费线程池中的线程可以并发对同一个消息队列的消息进行消费,消费成功后,取出消息队列中最小的消息偏移量作为消息消费进度偏移量存储在于消息消费进度存储文件中,集群模式消息消费进度存储在Broker(消息服务器),广播模式消息消费进度存储在消费者端。
RocketMQ不支持任意精度的定时调度消息,只支持自定义的消息延迟级别,例如1s、2s、5s等,可通过在broker配置文件中设置messageDelayLevel。
顺序消息一般使用集群模式,是指对消息消费者内的线程池中的线程对消息消费队列只能串行消费。并并发消息消费最本质的区别是消息消费时必须成功锁定消息消费队列,在Broker端会存储消息消费队列的锁占用情况。