MQ 的本质
MQ 本质都是[一发一存一消费]
生产者先将消息投递到一个叫做[队列]的容器中,队列存储消息之后,再转发给消费者
上面这个图便是消息队列最原始的模型,它包含了两个关键词:队列和消息
消息:就是要传输的数据,可以是最简单的文本字符串,也可以是自定义的复杂格式(只要能按照预定的格式解析出来)
队列:先进先出的数据结构。它是存放消息的容器,消息从队尾入队,从队首出队,入队即发消息的过程,出队即消费消息的过程
原始模型的进化
现在流行的消息队列都在原始的模型上做了拓展,同时提出了一些新名词,比如:主题(Topic)、分区(partition)等
队列模型
最初的消息队列就是原始模型,它是一个严格意义上的队列(Queue)。消息按照什么顺序写进去,就按照什么顺序读出来。不过,队列没有“读”这个操作,读就是出队,从队首删除消息
这便是队列模型:它允许多个生产者往同一个队列发送消息。但是如果有多个消费者,实际上是竞争关系,也就是一条消息只能被一个消费者收到,读完即删除。
发布-订阅模型
如果需要将一份消息数据发送给多个消费者,并且每个消费者都要求接收到全量的数据。很显然队列模型无法满足这个需求
一个可行的方案是:为每个消费者创建一个单独的队列,让生产者发送多份。这种做法比较笨,而且同一份数据被复制多份,也很浪费空间
为了解决这个问题,就演化出另外一种消息模型:发布-订阅模型
在发布-订阅模型中,存放消息的容器变成了“主题”,订阅者在接收消息之前需要先“订阅主题”。最终,每个订阅者都可以接收到同一个主题的全量消息
仔细对比下它和“队列模型”的异同:生产者就是发布者,队列就是主题,消费者就是订阅者,无本质区别。唯一的不同点在于:一份消息数据是否可以被多次消费
小结
上面两种模型就是“单播和广播的区别”。而且,当发布-订阅模型中只有一个订阅者时,它和队列模型就一样了。因此在功能上是完全兼容队列模型的
这也解释了为什么现在主流的消息中间件如 RocketMq、Kafka 都是直接基于发布-订阅模型实现的。此外,RabbitMQ 中之所以有一个 Exchange 模块,其实也是为了解决消息的投递问题,可以变相实现发布-订阅模型
包括大家接触到的“消费组”、“集群消费”、“广播消费”这些概念,都和上面这两种模型相关,以及在应用层面大家最常见的情形:组间广播、组内单播,也属于此范畴
透过模型看 MQ 的应用场景
目前,MQ 的应用场景非常多:系统解耦、异步通信、流量削峰、延迟通知、最终一致性保证、顺序消息和流式处理等
MQ 消息模型的适配性很广
MQ 消费模型主要解决的是:生产者和消费者的通信问题
通过对比,能够很明显地看出两点差异:
1.引入 MQ 后,由之前的 1 次 RPC 变成了现在的 2 次 RPC,而且生产者只跟队列耦合,它根本无需知道消费者的存在
2.多了一个中间节点[队列]进行消息存储,相当于同步变成了异步
再反过来思考 MQ 的应用场景,就不难理解为什么 MQ 适用了。因为这些场景无外乎都应用到了上面的两个特性
举个例子:比如说电商业务中最常见的[订单支付]场景,在订单支付成功后,需要更新订单状态、更新用户积分、通知商家有新订单、更新推荐系统中的用户画像等
引入 MQ 之后,订单系统现在只需要关注它最关心的流程:更新订单状态,即可。其他事情全部交给 MQ 来通知。这便是 MQ 解决的最核心的问题:系统解耦
改造前订单系统依赖 3 个外部系统,改造后仅仅依赖 MQ,而且后续业务再拓展(比如:营销系统打算针对支付用户奖励优惠券),也不涉及订单系统的改动,从而保证了核心流程的稳定性,降低了维护成本
这个改造还带来另外一个好处:因为 MQ 的引入,更新用户积分、通知商家、更新用户画像这些步骤全部变成了异步执行,能减少订单支付的整体耗时,提升订单系统的吞吐量。这便是 MQ 的另外一个典型应用场景:异步通信
除此之外,由于队列能转储消息,对于超出系统承载能力的场景,可以用 MQ 作为“漏斗”进行限流保护,即所谓的流量削峰
我们还可以用队列本身的顺序性,来满足消息必须按顺序投递的需求。利用队列+定时任务来实现消息的延时消费等
如何设计一个 MQ
1.MQ 的雏形
如果只是实现一个很粗糙的 MQ,不考虑生产环境的要求,应该如何设计呢?
MQ 最核心的功能需求:一发一存一消费。另外从技术维度来看 MQ 的通信模型,可以理解成:两次 RPC+消息转储
1.直接利用成熟的 RPC 框架(dubbo 或者 Thrift),实现两个接口:发消息和读消息
2.消息放在本地内存即可。数据结构可以用 JDK 自带的 ArrayBlockingQueue
2.写一个适用于生产环境的 MQ
1.关键点
假如我们还是只考虑最基础的功能:发消息、存消息、消费消息(支持发布-订阅模式)
那么在生产环境中,这些基础功能将会面临以下问题:
- 1.高并发场景下,如何保证收发消息的性能
- 2.如何保证消息服务的高可用和高可靠
- 3.如何保证服务是可以水平任意拓展的
- 4.如何保证消息存储也是水平可拓展的
- 5.各种元数据(比如集群中的各个节点、主题、消费关系等)如何管理,需不需要考虑数据的一致性
可见,如何解决高并发场景下满足高性能、高可靠等非功能性需求,才是这个问题的关键所在
2.整体设计思路
整体架构会涉及三类角色:
另外,将[一发一存一消费]这个核心流程进一步细化后,比较完整的数据流如下:
基于上面两个图,我们可以很快明确出 3 类角色的作用,分别如下:
1.Broker(服务端):MQ 中最核心的部分,是 MQ 的服务端,核心逻辑几乎全在这里,它为生产者和消费者提供 RPC 接口,负责消息的存储、备份和删除,以及维护消费关系等
2.Producer(生产者):MQ 的客户端之一,调用 Broker 提供给的 RPC 接口发送消息
3.Consumer(消费者):MQ 的另外一个客户端,调用 Broker 提供的 RPC 接口接收消息,同时完成消息确认
3.详细设计
再展开讨论下一些具体的技术难点和可行的解决方案
难点 1:RPC 通信
解决的是 Broker 与 Producer 以及 Consumer 之间的通信问题。如果不重复造轮子,直接利用成熟的 RPC 框架如 dubbo、Thrift 实现即可,这样不需要考虑服务注册与发现、负载均衡、通信协议、序列化方式等一系列问题
当然,也可以基于 Netty 来做底层通信,用 Zookeeper、Eureka 等来做注册中心,然后自定义一套新的通信协议(类似 kafka)。也可以基于 AMQP 这种标准化的 MQ 协议来做实现(类似 RabbitMQ)。对比直接用 RPC 框架,这种方案的定制化能力和优化空间更大
难点 2:高可用设计
高可用主要涉及两个方面:Broker 服务的高可用、存储方案的高可用
Broker 服务的高可用,只需要保证 Broker 可水平拓展及集群部署即可。进一步通过服务自动注册与发现、负载均衡、超时重试机制、发送和消费时的 ack 机制来保证
存储方案的高可用有两个思路:
参考 kafka 的分区+多副本模式,但是需要考虑分布式场景下数据复制和一致性方案(类似 ZAB、Raft 等协议),并实现自动故障转移
还可以用主流的 DB、分布式文件系统、带持久化能力的 KV 系统,他们都有自己的高可用方案
难点 3:存储设计
消息的存储方案是 MQ 的核心部分。可靠性保证已经在高可用设计中谈过了,可靠性要求不高的话直接用内存或者分布式缓存也可以。这里重点说一下存储的高性能如何保证,这个问题的决定因素在于存储结构的设计
目前主流的方案是:追加写日志文件(数据部分)+索引文件的方式(很多主流的开源 MQ 都是这种方式)。索引设计上可以考虑稠密索引或者稀疏索引,查找消息可以利用跳跃表、二分查找等,还可以通过操作系统的页缓存、零拷贝等技术来提升磁盘文件的读写性能
如果不追求很高的性能,也可以考虑现成的分布式文件系统、KV 存储或者数据库方案
难点 4:消费关系管理
为了支持发布-订阅的广播模式,Broker 需要知道每个主题都有哪些 Consumer 订阅了,基于这个关系进行消息投递
由于 Broker 是集群部署的,所以消费关系通常维护在公共存储上,可以基于 zookeeper、apollo 等配置中心来管理以及进行变更通知
难点 5:高性能设计
存储的高性能已经谈过了,当然还可以从其他方面进一步优化性能
比如 Reactor 网络 IO 模型、业务线程池的设计、生产端的批量发送、Broker 端的异步刷盘、消费端的批量拉取等
4.小结
要回答好如何设计一个 MQ
需要从功能性需求(收发消息)和非功能性需求(高性能、高可用、高拓展等)两方面入手
功能性需求不是重点,能覆盖 MQ 最基础的功能即可,至于延时消息、事务消息、重试队列等高级特性只是锦上添花的东西
最核心的是:能结合功能性需求,理清楚整体的思路,然后顺着这个思路取考虑非功能性的诉求如何满足,这才是技术难点所在
消息队列的非幂等问题
幂等性概念
所谓幂等性就是无论多少次操作都和第一次的操作结果一样。如果消息被多次消费,很有可能造成数据的不一致。而如果消息不可避免地被消费多次,如果我们开发人员能通过技术手段保证数据的前后一致性,那也是可以接接受的
场景分析
RabbitMQ、RocketMQ、Kafka 等消息中间件都有可能出现消息重复消费的问题。这种问题并不是 MQ 自己保证的,而是需要开发人员来保证
这几款消息队列都考虑了消息的幂等性,以 Kafka 为例:
Kafka 有一个偏移量的概念,代表着消息的序号,每条消息写到消息队列都会有一个偏移量,消费者消费了数据之后,每过一段固定的时间,就会把消费过的消息的偏移量提交一下,表示已经消费过了,下次消费就从偏移量后面开始消费。
问题:当消费完消息后,还没来得及提交偏移量,系统就被关机了,那么未提交偏移量的消息则会再次被消费
如下图所示,队列中的数据 A、B、C 对应的偏移量分别未 100、101、102,都被消费者消费了,但是只有数据 A 的偏移量 100 提交成功,另外 2 个偏移量因系统重启而导致未及时提交。
重启后,消费者又是拿偏移量 100 以后的数据,从偏移量 101 开始消费消息。所以数据 B 和数据 C 被重复消费
幂等性解决方案
微信支付结果通知场景
- 微信官方文档上体到微信支付通知结果可能会推送多次,需要开发者自行保证幂等性。第一次我们可以直接修改订单状态(如:支付中->支付成功),第二次就根据订单状态来判断,如果不是支付中,则不进行订单处理逻辑
插入数据库场景
- 每次插入数据时,先检查下数据库中是否又这条数据的主键 id,如果有,则进行更新操作
写 redis 场景
- redis 的 set 操作天然具有幂等性,所以不用考虑 redis 写的问题
其他场景方案
- 生产者发送每条数据时,增加一个全局唯一 id,类似订单 id。每次消费时,先去 redis 查询是否有这个 id,如果没有,则进行正常处理消息,且将 id 存到 redis。如果查到有这个 id,说明之前消费过,则不要重复处理这条消息
消息队列消息丢失的问题
1.生产者存放消息的过程中丢失消息
生产者丢失消息解决方案
- 事务机制(不推荐,同步方式)
对于 RabbitMQ 来说,生产者发送数据之前开启 RabbitMQ 的事务机制 channel.txselect,如果消息没有进入队列,则生产者收到异常报错,并进行回滚 channel.txRollback,然后可以操作重试发送消息;如果收到了消息,则可以提交事务 channel.txCommit。但是这是一个同步的操作,会影响性能
- confirm 机制(推荐,异步方式)
可以通过 confirm 模式来解决同步机制的性能问题。每次生产者发送的消息都会被分配一个唯一的 id,如果写入到了 rabbitMQ 队列中,则 rabbitMQ 会回传一个 ack 消息,说明这个消息写入成功。如果 rabbitMQ 没能写入这个消息,则回调 nack 接口,说明需要重试发送消息。
也可以自定义超时时间+消息 id 来实现超时重试机制。但可能出现的问题是调用 ack 接口时失败了,所以会出现消息被发送两次的问题。这个时候就需要保证消费者消费消息的幂等性
事务模式和 confirm 模式的区别
事务模式是同步的,提交事务后会被阻塞直到提交事务成功
confirm 模式异步接收通知,但可能接收不到通知,需要考虑接收不到通知的场景
2.消息队列丢失消息
消息队列的消息可以放到内存中,或者将内存中的消息转储到硬盘(比如数据库)中,一般都是内存和硬盘都存有消息。如果只是放在内存中,那么当机器重启了,消息就全部丢失了。如果是硬盘中,则可能存在一种极端情况,就是将内存中的数据转换到硬盘的期间,消息队列出问题了,未能完成消息的持久化操作
消息队列丢失消息解决方案
创建 Queue 的时候将其设置为持久化
发送消息的时候将消息的 deliveryMode 设置为 2
开启生产者 confirm 模式,可以重试发送消息
3.消费者丢失消息
消费者刚拿到消息,还没有开始处理,进程因为异常退出了,消费者没有机会再次拿到消息
消费者丢失消息解决方案
关闭 RabbitMQ 的自动 ack,每次生产者将消息写入消息队列后,就自动回传一个 ack 给生产者
消费者处理完消息后再主动 ack,告诉消息队列消息处理完成
问题:这种主动 ack 有什么漏洞,如果主动 ack 的时候挂了,怎么办
可能会被再次消费,这个时候就需要幂等处理了
问题:如果这条消息一直被重复消费怎么办
需要加上重试次数的监控,如果超过一定次数则将消息丢失,记录到异常表或发送异常通知给管理人员
4.RabbitMQ 消息丢失总结
5.Kafka 消息丢失
场景
Kafka 的某个 broker 节点宕机了,重新选举 leader(写入的节点)。如果 leader 挂了,follower 还有些数据未同步完,则 follower 成为 leader 后,消息队列会丢失一部分数据
Kafka 消息丢失解决方案
给 topic 设置replication.factor参数,值必须大于 1,要求每个 partition 必须有至少 2 个副本
给 Kafka 服务端设置min.insyc.replicas必须大于 1,表示一个 leader 至少一个 follower 还跟自己保持联系
消息队列消息乱序的问题
坑:用户先下单成功,然后取消订单,如果顺序乱了,则最后数据库里面会有一条下单成功的订单
RabbitMQ 场景
生产者向消息队列按照顺序发送了 2 条消息,消息 1:增加数据 A;消息 2:删除数据 A
期望结果:数据 A 被删除
但是如果有两个消费者,消费顺序是:消息 2、消息 1。则最后结果是增加了数据 A
RabbitMQ 解决方案
将 Queue 进行拆分,创建多个内存 Queue,消息 1 和消息 2 进入同一个 Queue
创建多个消费者,每个消费者对应一个 Queue
Kafka 场景
创建了 topic,有 3 个 partition
创建一条订单记录,订单 id 作为 key,订单相关的消息都丢到同一个 partition 中,同一个生产者创建的消息,顺序是正确的
为了快速消费消息,会创建多个消费者去处理消息,而为了提高效率,每个消费者可能会创建多个线程来并行的去拿消息及处理消息,处理消息的顺序可能就乱了
Kafka 解决方案
解决方案和 RabbitMQ 类似,利用多个内存 Queue,每个线程消费一个 Queue
具有相同 Key 的消息,进入同一个 Queue
消息队列消息积压的问题
消息积压:消息队列里面有很多消息来不及消费
场景 1:消费端出了问题,比如消费者都挂了,没有消费者消费消息,导致消息在队列里面不断积压
场景 2:消费端出了问题,比如消费者消费的速度太慢了,导致消息不断积压
解决方案:
修复代码层面消费者的问题,确保后续消费速度恢复或尽可能加快消费的速度
停掉现有的消费者
临时建立好原先 5 倍数量的 Queue
临时建立好原先 5 倍数量的消费者
将堆积的消息全部转入临时的 Queue,用临时消费者消费这些 Queue
消费完成后恢复原有状态
消息队列消息过期失效的问题
坑:RabbitMQ 可以设置过期时间,如果消息超过一定时间还没有被消费者消费,会被 RabbitMQ 给清理掉。消息就丢失了
消息过期失效解决方案
准备好批量重导的程序
手动将消息闲时批量重导
消息队列队列写满
坑:当消息队列因消息积压导致的队列快写满,所以不能接收更多的消息了。生产者生产的消息将会被丢弃
解决方案:
判断哪些是无用的消息,RabbitMQ 可以进行 purge message 操作
如果是有用的消息,则需要将消息快速消费,将消息里面的内容转存到数据库
准备好程序将转存在数据库中的消息再次重导入到消息队列
显式重导消息到消息队列