RibbitMQ

基本架构

下图是RabbitMQ的基本架构

  1. 生产者(Producer):生产者是消息的发送方,负责产生发送消息到RabbitMQ。生产者通常将消息发送到交换机(Exchange).
  2. 交换机(Exchange):交换机是消息的分发中心,负责将接收到的消息路由到一个或多个队列中。它定义了消息的传递规则,可以根据规则将消息发送到一个或多个队列
    • 直连交换机(Direct Exchange):将消息路由到与消息中的路由键(Routing Key)完全匹配的队列
    • 主题交换机(Topic Exchange):根据通配符匹配路由键,将消息路由到一个或多个队列
    • 扇出交换机(Fanout Exchange):将消息广播到所有交换机绑定的队列,忽略路由键
    • 头部交换机(Header Exchange):根据消息头中的属性进行匹配,将消息路由到与消息匹配的队列
  3. 队列(Queue):队列是消息的存储区,用于存储生产者发送的消息。消息最终会被消费者从队列中取出并处理。每个队列都有一个名称,并且可以绑定到一个或多个交换机
  4. 消费者(Consumer):消费者是消息的接收方,负责从队列中获取消息并进行处理。消费者通过订阅队列来接收消息
  5. 绑定(Binding):绑定是交换机和队列之间的关联关系。生产者将消息发送到交换机,而队列通过绑定与交换机关联,从而接收到消息
  6. 虚拟主机(Virtual Host):虚拟主机是RabbitMQ的基本工作单元,每个虚拟主机拥有自己的独立用户、权限、交换机、队列等资源,完全隔离于其他虚拟主机
  7. 连接(Connection):连接是指生产者、消费者与RabbitMQ之间的网络连接。每个连接可以包含多个信道(Channel),每个信道是一个独立的会话通道,可以进行独立的消息传递

如何保证消息不丢失

丢失场景

  • 生产者发送消息到RabbitMQ服务的过程中出现消息丢失。可能是网络波动未收到消息,又或者是服务器宕机
  • RabbitMQ服务器消息持久化出现消息丢失。消息发送到RabbitMQ之后,未能及时存储完成持久化,RabbitMQ服务器出现宕机,消息出现丢失
  • 消费者拉取消息过程以及拿到消息出现消息丢失。RabbitMQ服务器获取到消息过程出现网络波动等问题出现消息丢失;消费者拿到消息后但是消费者未能正常消费,导致丢失,可能是消费者出现处理异常又或者是消费者宕机

针对上述三种消息丢失场景,RabbitMQ提供了相应的解决方案:

  • confirm消息确认机制(生产者)
  • 消息持久化机制(Broker)
  • ACK事务机制(消费者)

confirm消息确认机制

Confirm模式时RabbitMQ提供的一种消息可靠性保证机制。当生产者通过Confirm模式发送消息时,它会等待RabbitMQ的确认,确保消息已经被正确地投递到了指定的Exchange中

  • 消息正确投递到queue时,会返回ack
  • 没有正确投递到queue时,会返回nack。如果exchange没有绑定queue,也会出现消息消失

消息持久化机制

持久化机制是指消息存储到磁盘,以保证RabbitMQ服务器宕机或者重启时,消息不会丢失

使用方法:

  • 生产者通过将消息的delivery_mode属性设置为2,将消息标记为持久化
  • 队列也需要进行持久化设置,确保队列在RabbitMQ服务器重启后仍然存在。经典队列需要将durable属性设置为true。而仲裁队列和流式队列默认必须持久化保存

注意事项:

  • 持久化机制影响性能,因此在需要保证确保消息不丢失地场景下使用

ACK事务机制

ACK事务机制用于确保消息被正确消费。当消息被消费者成功处理后,消费者确认(ACK)给RabbitMQ,告知消息可以被移除。这个过程是自动处理地,也可以关闭进行手工ACK

使用方法:

  • 在RabbitMQ中,ACK机制默认是开启的。当消息被消费者接收后,会立即从队列中删除,除非消费者发生异常
  • 可以手动开启ACK机制,通过将auto_ack参数设置为False,手动控制消息的ACK

注意事项:

  • ACK机制可以确保消息不会被重复处理,但如果消费者发生异常或者未发送ACK,消息可能会被重复投递

如何保证不被重复消费

重复消费场景

  • 生产者:生产者可能会重复推送一条消息到MQ中,比如Controller接口被调用了2次,没有做接口幂等性导致的
  • MQ:在消费者消费完准备响应ack消息消费成功时,MQ突然挂掉了,导致MQ以为消费者还未消费该条数据,MQ恢复后再次推送了该条消息,导致了重复消费
  • 消费者:消费者已经消费完消息,正准备但是还未响应ack消息时,此时消费者挂了,服务重启后MQ以为消费者还没有消费该消息,再次推送了该消息,再次推送了该消息

解决方案

数据库唯一键约束

缺点:局限性很大,仅仅只能用在我们数据新增场景,并且性能比较低

使用乐观锁

假设是更新订单状态,在发送的消息的时候带上修改字段的版本号

缺点:如果说跟新字段比较多,并且更新场景比较多,可能会导致数据库字段增加并且还有可能出现多条消息同时在队列中此时他们修改字段版本号一致,排在后续的消息无法被消费

如何解决消费堆积问题