RabbitMQ
RabbitMQ高频面试题(2022最新,建议收藏!) - 知乎 (zhihu.com)
什么是RabbitMQ?
RabbitMQ是一个由erlang开发的消息队列。消息队列用于应用间的异步协作。
RabbitMQ的组件
Message:由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key、priority、delivery-mode(是否持久性存储)等。
Publisher:消息的生产者。
Exchange:接收消息并将消息路由到一个或多个Queue。default exchange 是默认的直连交换机,名字为空字符串,每个新建队列都会自动绑定到默认交换机上,绑定的路由键名称与队列名称相同。
Binding:通过Binding将Exchange和Queue关联,这样Exchange就知道将消息路由到哪个Queue中。
Queue:存储消息,队列的特性是先进先出。一个消息可分发到一个或多个队列。
Virtual host:每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange和queue。
Broker:消息队列服务器实体。
什么时候使用MQ
对于一些不需要立即生效的操作,可以拆分出来,异步执行,使用消息队列实现。
以常见的订单系统为例,用户点击下单按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发短信通知。这种场景下就可以用 MQ 。将短信通知放到 MQ 异步执行,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ, 让主流程快速完结,而由另外的线程消费MQ的消息。
RabbitMQ的优缺点
缺点:使用erlang实现,不利于二次开发和维护;性能较kafka差,持久化消息和ACK确认的情况下生产和消费消息单机吞吐量大约在1-2万左右,kafka单机吞吐量在十万级别。
优点:有管理界面,方便使用;可靠性高;功能丰富,支持消息持久化、消息确认机制、多种消息分发机制。
RabbitMQ 有哪些重要的角色?
RabbitMQ 中重要的角色有:生产者、消费者和代理。
- 生产者:消息的创建者,负责创建和推送数据到消息服务器;
- 消费者:消息的接收方,用于处理数据和确认消息;
- 代理:就是 RabbitMQ 本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。
Exchange 类型
Exchange分发消息时根据类型的不同分发策略不同,目前共四种类型:direct、fanout、topic、headers 。headers 模式根据消息的headers进行路由,此外 headers 交换器和 direct 交换器完全一致,但性能差很多。
Exchange规则。
类型名称 |
类型描述 |
fanout |
把所有发送到该Exchange的消息路由到所有与它绑定的Queue中 |
direct |
Routing Key==Binding Key |
topic |
模糊匹配 |
headers |
Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的header属性进行匹配。 |
direct
direct交换机会将消息路由到binding key 和 routing key完全匹配的队列中。它是完全匹配、单播的模式。
fanout
所有发到 fanout 类型交换机的消息都会路由到所有与该交换机绑定的队列上去。fanout 类型转发消息是最快的。
topic
topic交换机使用routing key和binding key进行模糊匹配,匹配成功则将消息发送到相应的队列。routing key和binding key都是句点号“. ”分隔的字符串,binding key中可以存在两种特殊字符“”与“##”,用于做模糊匹配,其中“”用于匹配一个单词,“##”用于匹配多个单词。
headers
headers交换机是根据发送的消息内容中的headers属性进行路由的。在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
消息丢失
消息丢失场景:生产者生产消息到RabbitMQ Server消息丢失、RabbitMQ Server存储的消息丢失和RabbitMQ Server到消费者消息丢失。
消息丢失从三个方面来解决:生产者确认机制、消费者手动确认消息和持久化。
生产者确认机制
生产者发送消息到队列,无法确保发送的消息成功的到达server。
解决方法:
- 事务机制。在一条消息发送之后会使发送端阻塞,等待RabbitMQ的回应,之后才能继续发送下一条消息。性能差。
- 开启生产者确认机制,只要消息成功发送到交换机之后,RabbitMQ就会发送一个ack给生产者(即使消息没有Queue接收,也会发送ack)。如果消息没有成功发送到交换机,就会发送一条nack消息,提示发送失败。
在 Springboot 是通过 publisher-confirms 参数来设置 confirm 模式:
1 2 3 4
| spring: rabbitmq: publisher-confirms: true
|
在生产端提供一个回调方法,当服务端确认了一条或者多条消息后,生产者会回调这个方法,根据具体的结果对消息进行后续处理,比如重新发送、记录日志等。
1 2 3 4 5 6 7 8 9 10
| final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> { log.info("correlationData: " + correlationData); log.info("ack: " + ack); if(!ack) { log.info("异常处理...."); } };
rabbitTemplate.setConfirmCallback(confirmCallback);
|
路由不可达消息
生产者确认机制只确保消息正确到达交换机,对于从交换机路由到Queue失败的消息,会被丢弃掉,导致消息丢失。
对于不可路由的消息,有两种处理方式:Return消息机制和备份交换机。
Return消息机制
Return消息机制提供了回调函数 ReturnCallback,当消息从交换机路由到Queue失败才会回调这个方法。需要将mandatory
设置为 true
,才能监听到路由不可达的消息。
1 2 3 4
| spring: rabbitmq: template.mandatory: true
|
通过 ReturnCallback 监听路由不可达消息。
1 2 3 4
| final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) -> log.info("return exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText); rabbitTemplate.setReturnCallback(returnCallback);
|
当消息从交换机路由到Queue失败时,会返回 return exchange: , routingKey: MAIL, replyCode: 312, replyText: NO_ROUTE
。
备份交换机
备份交换机alternate-exchange 是一个普通的exchange,当你发送消息到对应的exchange时,没有匹配到queue,就会自动转移到备份交换机对应的queue,这样消息就不会丢失。
消费者手动消息确认
有可能消费者收到消息还没来得及处理MQ服务就宕机了,导致消息丢失。因为消息者默认采用自动ack,一旦消费者收到消息后会通知MQ Server这条消息已经处理好了,MQ 就会移除这条消息。
解决方法:消费者设置为手动确认消息。消费者处理完逻辑之后再给broker回复ack,表示消息已经成功消费,可以从broker中删除。当消息者消费失败的时候,给broker回复nack,根据配置决定重新入队还是从broker移除,或者进入死信队列。只要没收到消费者的 acknowledgment,broker 就会一直保存着这条消息,但不会 requeue,也不会分配给其他 消费者。
消费者设置手动ack:
1 2
| ##设置消费端手动 ack spring.rabbitmq.listener.simple.acknowledge-mode=manual
|
消息处理完,手动确认:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @RabbitListener(queues = RabbitMqConfig.MAIL_QUEUE) public void onMessage(Message message, Channel channel) throws IOException {
try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, true); System.out.println("mail listener receive: " + new String(message.getBody())); }
|
当消息消费失败时,消费端给broker回复nack,如果consumer设置了requeue为false,则nack后broker会删除消息或者进入死信队列,否则消息会重新入队。
持久化
如果RabbitMQ服务异常导致重启,将会导致消息丢失。RabbitMQ提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ,消息也不会丢失。
消息持久化需要满足以下条件:
- 消息设置持久化。发布消息前,设置投递模式delivery mode为2,表示消息需要持久化。
- Queue设置持久化。
- 交换机设置持久化。
当发布一条消息到交换机上时,Rabbit会先把消息写入持久化日志,然后才向生产者发送响应。一旦从队列中消费了一条消息的话并且做了确认,RabbitMQ会在持久化日志中移除这条消息。在消费消息前,如果RabbitMQ重启的话,服务器会自动重建交换机和队列,加载持久化日志中的消息到相应的队列或者交换机上,保证消息不会丢失。
镜像队列
当MQ发生故障时,会导致服务不可用。引入RabbitMQ的镜像队列机制,将queue镜像到集群中其他的节点之上。如果集群中的一个节点失效了,能自动地切换到镜像中的另一个节点以保证服务的可用性。
通常每一个镜像队列都包含一个master和多个slave,分别对应于不同的节点。发送到镜像队列的所有消息总是被直接发送到master和所有的slave之上。除了publish外所有动作都只会向master发送,然后由master将命令执行的结果广播给slave,从镜像队列中的消费操作实际上是在master上执行的。
消息重复消费怎么处理?
消息重复的原因有两个:1.生产时消息重复,2.消费时消息重复。
生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,这时候生产者就会重新发送这条消息,导致MQ会接收到重复消息。
消费者消费成功后,给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息不丢失,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。由于重复消息是由于网络原因造成的,无法避免。
解决方法:发送消息时让每个消息携带一个全局的唯一ID,在消费消息时先判断消息是否已经被消费过,保证消息消费逻辑的幂等性。具体消费过程为:
- 消费者获取到消息后先根据id去查询redis/db是否存在该消息
- 如果不存在,则正常消费,消费完毕后写入redis/db
- 如果存在,则证明消息被消费过,直接丢弃
消费端怎么进行限流?
当 RabbitMQ 服务器积压大量消息时,队列里的消息会大量涌入消费端,可能导致消费端服务器奔溃。这种情况下需要对消费端限流。
Spring RabbitMQ 提供参数 prefetch 可以设置单个请求处理的消息个数。如果消费者同时处理的消息到达最大值的时候,则该消费者会阻塞,不会消费新的消息,直到有消息 ack 才会消费新的消息。
开启消费端限流:
1 2
| spring.rabbitmq.listener.simple.prefetch=2
|
原生 RabbitMQ 还提供 prefetchSize 和 global 两个参数。Spring RabbitMQ没有这两个参数。
1 2 3
|
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
|
什么是死信队列?
消费失败的消息存放的队列。
消息消费失败的原因:
- 消息被拒绝并且消息没有重新入队(requeue=false)
- 消息超时未消费
- 达到最大队列长度
设置死信队列的 exchange 和 queue,然后进行绑定:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Bean public DirectExchange dlxExchange() { return new DirectExchange(RabbitMqConfig.DLX_EXCHANGE); }
@Bean public Queue dlxQueue() { return new Queue(RabbitMqConfig.DLX_QUEUE, true); }
@Bean public Binding bindingDeadExchange(Queue dlxQueue, DirectExchange deadExchange) { return BindingBuilder.bind(dlxQueue).to(deadExchange).with(RabbitMqConfig.DLX_QUEUE); }
|
在普通队列加上两个参数,绑定普通队列到死信队列。当消息消费失败时,消息会被路由到死信队列。
1 2 3 4 5 6 7 8
| @Bean public Queue sendSmsQueue() { Map<String,Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange", RabbitMqConfig.DLX_EXCHANGE); arguments.put("x-dead-letter-routing-key", RabbitMqConfig.DLX_QUEUE); return new Queue(RabbitMqConfig.MAIL_QUEUE, true, false, false, arguments); }
|
生产者完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| @Component @Slf4j public class MQProducer {
@Autowired RabbitTemplate rabbitTemplate;
@Autowired RandomUtil randomUtil;
@Autowired UserService userService;
final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> { log.info("correlationData: " + correlationData); log.info("ack: " + ack); if(!ack) { log.info("异常处理...."); } };
final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) -> log.info("return exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
public void sendMail(String mail) { Integer random = randomUtil.nextInt(100000, 999999); Map<String, String> map = new HashMap<>(2); String code = random.toString(); map.put("mail", mail); map.put("code", code);
MessageProperties mp = new MessageProperties(); Message msg = new Message("tyson".getBytes(), mp); msg.getMessageProperties().setExpiration("3000"); CorrelationData correlationData = new CorrelationData("1234567890"+new Date());
rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); rabbitTemplate.convertAndSend(RabbitMqConfig.MAIL_QUEUE, msg, correlationData);
userService.updateMailSendState(mail, code, MailConfig.MAIL_STATE_WAIT); } }
|
消费者完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Slf4j @Component public class DeadListener {
@RabbitListener(queues = RabbitMqConfig.DLX_QUEUE) public void onMessage(Message message, Channel channel) throws IOException {
try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag,false); System.out.println("receive--1: " + new String(message.getBody())); } }
|
当普通队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的死信交换机去,然后被路由到死信队列。可以监听死信队列中的消息做相应的处理。
说说pull模式
pull模式主要是通过channel.basicGet方法来获取消息,示例代码如下:
1 2 3
| GetResponse response = channel.basicGet(QUEUE_NAME, false); System.out.println(new String(response.getBody())); channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
|
怎么设置消息的过期时间?
在生产端发送消息的时候可以给消息设置过期时间,单位为毫秒(ms)
1 2
| Message msg = new Message("tyson".getBytes(), mp); msg.getMessageProperties().setExpiration("3000");
|
也可以在创建队列的时候指定队列的ttl,从消息入队列开始计算,超过该时间的消息将会被移除。
RocketMQ
投递消息的策略
生产者(Producer)
2.1 默认投递方式:基于Queue队列
轮询算法投递
默认情况下,采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列
的消息投递数量尽可能均匀.
2.2 默认投递方式的增强:基于Queue队列轮询算法和消息投递延迟最小的策略投递
默认的投递方式比较简单,但是也暴露了一个问题,就是有些Queue队列可能由于自身数量积压等原因,可能在投递的过程比较长,对于这样的Queue队列会影响后续投递的效果。
基于这种现象,RocketMQ在每发送一个MQ消息后,都会统计一下消息投递的时间延迟,根据这个时间延迟,可以知道往哪些Queue队列投递的速度快。
在这种场景下,会优先使用消息投递延迟最小的策略,如果没有生效,再使用Queue队列轮询的方式。
2.3 顺序消息的投递方式
上述两种投递方式属于对消息投递的时序性没有要求的场景,这种投递的速度和效率比较高。而在有些场景下,需要保证同类型消息投递和消费的顺序性。
基于上述的情况,RockeMQ
采用了这种实现方案:对于相同订单号的消息,通过一定的策略,将其放置在一个 queue队列中
,然后消费者
再采用一定的策略(一个线程独立处理一个queue
,保证处理消息的顺序性),能够保证消费的顺序性
如何为消费者分配queue队列
?
RocketMQ对于消费者消费消息有两种形式:
BROADCASTING
:广播式消费,这种模式下,一个消息会被通知到每一个消费者
CLUSTERING
: 集群式消费,这种模式下,一个消息最多只会被投递到一个消费者
上进行消费
模式如下:
对于使用了消费模式为MessageModel.CLUSTERING进行消费时,需要保证一个消息在整个集群中只需要被消费一次。实际上,在RoketMQ底层,消息指定分配给消费者的实现,是通过queue队列分配给消费者的方式完成的:也就是说,消息分配的单位是消息所在的queue队列。即:
将queue队列指定给特定的消费者后,queue队列内的所有消息将会被指定到消费者进行消费。
RocketMQ定义了策略接口AllocateMessageQueueStrategy,对于给定的消费者分组,和消息队列列表、消费者列表,当前消费者应当被分配到哪些queue队列
算法名称 含义
AllocateMessageQueueAveragely 平均分配算法
AllocateMessageQueueAveragelyByCircle 基于环形平均分配算法
AllocateMachineRoomNearby 基于机房临近原则算法
AllocateMessageQueueByMachineRoom 基于机房分配算法
AllocateMessageQueueConsistentHash 基于一致性hash算法
AllocateMessageQueueByConfig 基于配置分配算法
Producer代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| @Slf4j @Component public class MQProducerService {
@Value("${rocketmq.producer.send-message-timeout}") private Integer messageTimeOut;
private static final String topic = "RLT_TEST_TOPIC"; @Autowired private RocketMQTemplate rocketMQTemplate;
public void send(User user) { rocketMQTemplate.convertAndSend(topic + ":tag1", user);
}
public SendResult sendMsg(String msgBody) { SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build()); log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult)); return sendResult; }
public void sendAsyncMsg(String msgBody) { rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { } @Override public void onException(Throwable throwable) { } }); }
public void sendDelayMsg(String msgBody, int delayLevel) { rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel); }
public void sendOneWayMsg(String msgBody) { rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build()); }
public SendResult sendTagMsg(String msgBody) { return rocketMQTemplate.syncSend(topic + ":tag2", MessageBuilder.withPayload(msgBody).build()); } }
|
Consumer代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| @Slf4j @Component public class MQConsumerService {
@Service @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1", consumerGroup = "Con_Group_One") public class ConsumerSend implements RocketMQListener<User> { @Override public void onMessage(User user) { log.info("监听到消息:user={}", JSON.toJSONString(user)); } }
@Service @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", consumerGroup = "Con_Group_Two") public class ConsumerSend2 implements RocketMQListener<String> { @Override public void onMessage(String str) { log.info("监听到消息:str={}", str); } }
@Service @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag2", consumerGroup = "Con_Group_Three") public class Consumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { byte[] body = messageExt.getBody(); String msg = new String(body); log.info("监听到消息:msg={}", msg); } }
}
|
///
Broker
生产者
写入分区策略
消费者
读取分区策略
文件夹
~/store
1 2 3 4 5 6 7 8
| abort:该文件在Broker启动后会自动创建,正常关闭Broker,该文件会自动消失。若在没有启动 Broker的情况下,发现这个文件是存在的,则说明之前Broker的关闭是非正常关闭。 checkpoint:其中存储着commitlog、consumequeue、index文件的最后刷盘时间戳 commitlog:其中存放着commitlog文件,而消息是写在commitlog文件中的 conæg:存放着Broker运行期间的一些配置数据 consumequeue:其中存放着consumequeue文件,队列就存放在这个目录中 index:其中存放着消息索引文件indexFile lock:运行期间使用到的全局资源锁
|
~/store/commitLog
别称:MapperFile
存储消息信息 = Message + Queue + PublishHost
在消息写入时
1 2 3 4 5 6 7
| 一条消息进入到Broker后经历了以下几个过程才最终被持久化。 1.Broker根据queueId,获取到该消息对应索引条目要在consumequeue目录中的写入偏移量,即 QueueOffset 2.将queueId、queueOffset等数据,与消息一起封装为消息单元 3.将消息单元写入到commitlog 4.同时,形成消息索引条目 5.将消息索引条目分发到相应的consumequeue
|
~/store/consumerQueue
topic/queueId/comsumerQueue
消息索引:commitLog Offset,Message Length, MessageTag(hash)
1
| 了提高效率,会为每个Topic在~/store/consumequeue中创建一个目录,目录名为Topic名称。在该Topic目录下,会再为每个该Topic的Queue建立一个目录,目录名为queueId。每个目录中存放着若干consumequeue文件,consumequeue文件是commitlog的索引文件,可以根据consumequeue定位到具体的消息。
|
1 2 3
| 备注: RocketMQ中的commitlog目录与consumequeue的结合就类似于Kafka中的partition分区目录。 mappedFile文件就类似于Kafka中的segment段。
|
文件的读写
消息写入
1 2 3 4
| 一条消息进入到Broker后经历了以下几个过程才最终被持久化。 1.Broker根据queueId,获取到该消息对应索引条目要在consumequeue目录中的写入偏移量,即QueueOffset 2.将queueId、queueOffset等数据,与消息一起封装为消息单元将消息单元写入到commitlog 3.同时,形成消息索引条目将消息索引条目分发到相应的consumequeue
|
消息拉取
1 2 3 4 5 6 7 8
| 当Consumer来拉取消息时会经历以下几个步骤: 1.Consumer获取到其要消费消息所在Queue的消费偏移量offset,计算出其要消费消息的消息offset (消费offset即消费进度,consumer对某个Queue的消费offset,即消费到了该Queue的第几条消息) 消息offset = 消费offset + 1 2.Consumer向Broker发送拉取请求,其中会包含其要拉取消息的Queue、消息offset及消息Tag。 3.Broker计算在该consumequeue中的queueOffset。queueOffset = 消息offset * 20字节从该queueOffset处开始向后查找第一个指定Tag的索引条目。 4.解析该索引条目的前8个字节,即可定位到该消息在commitlog中的commitlog offset从对应commitlog offset中读取消息单元,并发送给Consumer
|
性能提升
首先,RocketMQ对文件的读写操作是通过mmap零拷贝进行的,将对文件的操作转化为直接对内存地 址进行操作,从而极大地提高了文件的读写效率。
其次,consumequeue中的数据是顺序存放的,还引入了PageCache的预读取机制,使得对 consumequeue文件的读取几乎接近于内存读取,即使在有消息堆积情况下也不会影响性能