Message Queue消息队列——RabbitMQ
和队列 Queue 类似,不过消息队列是在分布式情况下,每个服务从一个消息队列中取出东西。消息保存到消息队列中,别的服务从消息中间件中取消息。
一、应用场景
异步、削峰、解耦
1.异步
例如用户注册为例
图 1:需要将用户信息写入到数据库中,然后发送注册邮件和注册短信,如果按照图 1 的方式顺序进行就会响应达到 150ms;
图 2:如果使用 CompletableFuture 进行异步编排,将发送注册邮件和发送注册信息进行并行处理,等待两者都完成就进行返回,则响应可以缩减到 100ms
图 3:其实我们并不关心发送注册邮件和短信是否成功,即使有时候是不成功的状态也是可以理解的,只要用户重复几次就行。只要将注册信息写入数据库,然后将消息写入到消息队列中,就立即返回,延迟就只需要 55ms,发送注册邮件和发送注册短信会由其他微服务从消息队列中取出消息进行处理,这样就可以将延迟进行进一步缩短。
2.解耦
当服务之间进行相互调用时,例如订单服务下单时需要调用库存服务进行出库操作,但是库存系统经常进行升级,导致接口参数经常发生变化,如果是使用 Feign 进行调用,当库存系统进行升级了,订单系统中的 Feign 接口就会发生变动。如果使用消息队列就可以进行解耦,当订单系统下订单后向消息队列中写入消息:某某下了一个订单,购置商品 xxx 几件,然后库存系统从消息队列中取出消息即可,这样就进行了解耦。
3.削峰
在秒杀构成中,可以利用消息队列进行流量削峰,秒杀发出的用户请求写入到消息队列中,后端服务集群按顺序取出消息队列中的消息,可以避免大量请求挤兑后端服务,造成崩溃。
二、概述
点对点式:消息生产者将发送消息,将消息放入消息队列中,消息接收者可以有很多,通过监听队列,当队列中有消息内容时,就从消息队列中获取消息内容。
特点:消息具有唯一的发送者和接受者,但并不是只有一个接收者。(也就是消息只能被一个人来消费,但是可以有很多人来监听消息队列的动静,就和猪槽里喂饲料一样,一对一)
发布订阅模式:消息发送者发送消息给消息队列,然后消息队列将消息发布到主题中,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息。
特点:可以有很多接受者,一对多
消息队列协议:
- JMS(JAVA Message Service)JAVA 消息服务:基于 JVM 消息代理的规范,ActiveMQ、HornetMQ 是 JMS 实现。定义了通用的 Java api 接口
- AMQP:RabbitMQ 是 AMQP 的实现,是一个消息代理的规范。
三、RabbitMQ 概念
生产者Publisher生产消息,发送的消息有消息头和消息体组成,其中消息体中最重要的是 route-key(路由键),消息发送给服务器,这个消息中间件服务器叫做Broker,生产者产生消息后会将消息通过Exchange 交换器路由给服务器中对应的队列,Exchange 交换器负责接收消息并进行消息分发;队列是用来存储消息;交换器与哪个队列进行绑定,是通过Binding进行消息队列和交换器之间的关联,一个绑定就是基于路由键将交换器和消息队列连接起来。消息消费者 Consumerr 和消息生产者 Publicsher 都是通过 Connection 与消息服务器建立连接进行通信,但是消费者有很多类型,例如用户、库存、订单等,一个客户端只会建立一条长连接,一条连接可以和多个队列进行连接,是通过 NIO 一条连接建立多个 Channel 信道,一条长连接可以有很多 Channel,消息的内容是通过信道来进行收发消息。虚拟主机就类似 Docker 中容器,表示一批交换器、消息队列和相关对象,一台服务器可以有多个虚拟主机,不同虚拟主机之间是相互隔离的。
四、Docker 安装 RabbitMQ
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
默认登陆用户密码都是 guest
五、细节概念
1.交换机 Exchange 类型
- direct:直接,精确匹配 route-key 转发;单播类型(一个)
- fanout:扇出;广播类型(全部)
- topic:主题(自定义)
- headers:匹配消息头而不是路由键,几乎不用到。与 direct 完全一致,但是性能会差很多
2.自定义交换机
1)定义一个交换机
2)创建一个队列
3)交换机绑定队列
六、整合 RabbitMQ
1.引入 spring-boot-starter-amqp
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
给容器中自动配置了 RabiitAutoConfiguration
2.配置
spring:
rabbitmq:
host: 192.168.2.200
port: 5672
virtual-host: /
3.使用 AmqpAdmin 创建 Exchagne/Queue/Binding
@Autowired
AmqpAdmin amqpAdmin;
/**
* 使用AmqpAdmin创建Exchange
*/
@Test
public void testCreateExchange(){
/**
* public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
*/
DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false, null);
amqpAdmin.declareExchange(directExchange);
log.info("Exchange:{}创建成功", directExchange.getName());
}
/**
* 使用AmqpAdmin创建Queue
*/
@Test
public void testCreateQueue(){
/**
* 队列名称、是否持久化、是否排他(只能独占一条连接)、是否自动删除、参数
* public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
*/
Queue queue = new Queue("hello-java-queue", true, false, false, null);
amqpAdmin.declareQueue(queue);
log.info("queue:{}创建成功", queue.getName());
}
/**
* 使用AmqpAdmin创建Binding,连接Exchange和Queue
*/
@Test
public void testCreateBinding(){
/**
* 目的地、目的地类型、交换机名称、路由键、参数
* 将exchange指定的交换机和destination目的地进行绑定,使用routingKey作为指定的路由键
* public Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)
*/
Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null);
amqpAdmin.declareBinding(binding);
log.info("Binding创建成功");
}
4.测试发送消息
可以将字符串或者对象作为消息发送,其中作为对象发送默认是使用 java 的序列化方式,为此对象必须实现 Serializable 接口;此外也可以向容器中注入 Jackson2JsonMessageConverter 将对象转为 JSON 字符串形式作为消息发送出去
@Configuration
public class MyRabbitConfig {
/**
* 注入JSON消息转化器
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 测试发送消息
*/
@Test
public void sendMessageTest(){
/**
* 交换机名称、路由键、传递对象会自动序列化
* public void convertAndSend(String exchange, String routingKey, Object object)
*/
String msg ="hello world";
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", msg);
log.info("消息{}已发送完成", msg);
/**
* 复杂对象作为消息发送,会使用序列化机制将对象写出去,要求对象必须实现Serializable
* 可以在容器中注入MessageConverter将对象序列化为JSON
*/
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setName("test");
reasonEntity.setCreateTime(new Date());
reasonEntity.setStatus(1);
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", reasonEntity);
}
5.接收消息
接收消息本质上就是监听队列,当队列中有消息时,就从队列中取出消息
启动类上使用**@EnableRabbit**开启监听消息
@RabbitListener 必须在容器中使用,标记在类或者方法上
1.@RabbitListener 标记在方法上时,当监听的队列中有消息时会自动的执行该方法
方法可以添加的参数
- 可以写成原生消息类型 Message,里面有消息头和消息体
@RabbitListener(queues={"hello-java-queue"}) // 组件必须在容器中才可以监听
public void receiveMessageTest(Message message, OrderReturnReasonEntity content){
byte[] body = message.getBody(); // 消息体
MessageProperties properties = message.getMessageProperties(); // 消息头
System.out.println("接收的消息为" + body);
}
接收的消息为(Body:'{"id":1,"name":"test","sort":null,"status":1,"createTime":1621413291190}' MessageProperties [headers={__TypeId__=com.lookstarry.doermail.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, consumerTag=amq.ctag-L8NVu3eZmSZRW2aWG85apA, consumerQueue=hello-java-queue])==>类型:class org.springframework.amqp.core.Message
- T<发送消息的类型> 后面添加上获取消息体的类型,可以通过该类直接获取内容
/**
* queues:声明需要监听的所有队列
* org.springframework.amqp.core.Message
* 参数是接收消息的类型
* 1、可以写成原生消息类型Message,里面有消息头和消息体
* 2、T<发送消息的类型> 后面添加上获取消息体的类型
*/
@RabbitListener(queues={"hello-java-queue"}) // 组件必须在容器中才可以监听
public void receiveMessageTest(Message message, OrderReturnReasonEntity content){
byte[] body = message.getBody(); // 消息体
MessageProperties properties = message.getMessageProperties(); // 消息头
System.out.println("接收的消息为" + message + "==>内容:" + content);
}
接收的消息为(Body:'{"id":1,"name":"test","sort":null,"status":1,"createTime":1621412699210}' MessageProperties [headers={__TypeId__=com.lookstarry.doermail.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, consumerTag=amq.ctag-DSSfCxc2AVxmeRo2rhEu0Q, consumerQueue=hello-java-queue])==>内容:OrderReturnReasonEntity(id=1, name=test, sort=null, status=1, createTime=Wed May 19 16:24:59 CST 2021)
- Channel
2.@RabbitListener 标记在类上,@RabbitHandler 标记在该类某个方法上
这样做的好处就是,队列中有很多不同类型的消息,就可以通过@RabbitHandler 标记的不同的方法来处理不同类型的消息
@RabbitListener(queues={"hello-java-queue"}) // 组件必须在容器中才可以监听
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
/**
* queues:声明需要监听的所有队列
* org.springframework.amqp.core.Message
* 参数是接收消息的类型
* 1、可以写成原生消息类型Message,里面有消息头和消息体
* 2、T<发送消息的类型> 后面添加上获取消息体的类型
* 3、Channel:当前传输数据的通道
*
* Queue:队列可以有很多人都来监听,只要收到消息队列就会删除消息,而且只能有一个人收到此消息
* 场景:
* 1)订单服务启动多个:同一个消息,只能有一个客户端能够收到
* 2)只有一个消息完全处理完,方法运行结束才可以接收到下一个消息
*/
// 处理OrderReturnReasonEntity类型的消息
@RabbitHandler
public void receiveMessageTest(Message message,
OrderReturnReasonEntity content,
Channel channel) throws InterruptedException {
byte[] body = message.getBody(); // 消息体
MessageProperties properties = message.getMessageProperties(); // 消息头
System.out.println("接收的消息为" + message + "==>内容:" + content);
System.out.println("消息处理完成" + content.getName());
}
// 处理String类型的消息
@RabbitHandler
public void receiveMessage2(Message message,
String content){
byte[] body = message.getBody(); // 消息体
MessageProperties properties = message.getMessageProperties(); // 消息头
System.out.println("接收的消息为" + message + "==>内容:" + content);
System.out.println("消息处理完成" + content);
}
七、RabbitMQ 消息确认机制——可靠抵达
背景:
在分布式系统中,有很多的微服务连接到队列进行监听消息,可能会由于网络的抖动、服务宕机等原因,导致消息的丢失,比如发送者发消息时丢失了或者消费者在消费消息的时候消息未收到;因此为了保证消息不丢失,可靠抵达,可以使用事务消息,为此引入确认机制。**
事务消息:
发消息的时候首先客户端会建立连接,会在通道中发送消息,可以设置通道为事务模式,只有当整个消息发送出去,消费者消费了消息,一连串全部成功,才算消息发送成功;不过这样会导致性能下降 250 倍
因此引入确认机制,RabbitMQ 的确认机制是使用了两端机制,一端是生产者两种回调确认模式,一端是消费者一种 ack 模式
- publisher confirmCallback 确认模式
当生产者将消息发送给 Broker 服务器,服务器收到了消息就会回调confirmCallback,就可以知道哪些消息抵达了服务器
- publisher returnCallback 未投递到队列 queue 退出模式
服务器收到消息后,会使用 Exchange 交换机投递到 queue 中,如果消息没有成功投递到队列中,就会使用 returnCallback
- consumer ack 机制
消费者如果从队列中成功取到了消息,就会发送一个 ack 确认,Queue 收到 ack 确认后,就会将该消息从队列中删除
1.发送端确认confirmCallback配置,服务器收到消息就会回调
1)开启发送端确认
spring:
rabbitmq:
# 开启发送端确认
publisher-confirms: true
2)手动编码发送端确认confirmCallback执行逻辑
只要消息正确抵达了 Broker,就会执行该逻辑
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
*/
@PostConstruct // MyRabbitConfig调用构造器创建完成以后最后执行该方法
public void initRabbitTemplate(){
// 设置确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 只要消息正确抵达了Broker,就会执行该逻辑ack=true
* @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id)
* @param ack 消息是否成功收到
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm...correlationData[" +correlationData + "]==>ack[" + ack + "]==>cause[" + cause + "]");
}
});
}
2.发送端投递到队列失败 retureCallback 设置
spring:
rabbitmq:
# 开启发送端消息抵达队列的确认
publisher-returns: true
# 只要抵达队列以异步模式优先回调这个returnConfigrm
template:
mandatory: true
// 消息抵达队列的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息没有投递给指定的队列,就会出发这个失败回调
* @param message 投递失败的消息详细信息
* @param replayCode 回复的状态码
* @param replayText 回复的文本内容
* @param exchange 当时这个消息发送给哪个交换机
* @param routingKey 该消息的消息头使用的路由键
*/
@Override
public void returnedMessage(Message message, int replayCode, String replayText, String exchange, String routingKey) {
System.out.println("Fail Message[" + message + "]==>relayCode[" + replayCode + "]==>replayText[" + replayText + "]==>routingKey[" + routingKey + "]");
}
});
3.消费端确认
保证每个消息被正确消费,此时 broker 才会从队列中移除该消息;默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息 1.问题:我们收到很多消息,自动回复给服务器 ack,但是其实只有一个消息处理成功服务器就宕机了,所以就出现了消息丢失。
解决:手动确认,只要明确告诉 MQ,消息被签收,没有 ack,消息就一直会是 unack,即使 Consumer 宕机,消息也不会丢失,会重新变为 Ready,下一次有新的 Consumer 连接进来就会把消息发给他。 2.如何手动签收:
在处理消息结束后,利用 Channel 的 basicAck 来进行签收;此外也可以利用 basicNAck 来拒绝签收,拒签的货物可以指定 requeue 来指定是否重新入队,false 就直接会丢弃,true 会重新入队(和快递签收退签是否退回给商家)
业务成功完成就应该签收,业务失败就手动拒签
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
@RabbitHandler
public void receiveMessageTest(Message message,
OrderReturnReasonEntity content,
Channel channel) throws InterruptedException {
byte[] body = message.getBody(); // 消息体
MessageProperties properties = message.getMessageProperties(); // 消息头
System.out.println("接收的消息为" + message + "==>内容:" + content);
System.out.println("消息处理完成" + content.getName());
// 通道内按顺序自增的
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag==>" + deliveryTag);
try {
if(deliveryTag % 2 == 0){
// 手动签收消息,false为非批量签收模式
channel.basicAck(deliveryTag, false);
System.out.println("签收了货物..." + deliveryTag);
}else{
// 拒绝签收消息
// 第三个参数requeue为是否重新入队: false直接丢弃 true发回服务器,重新入队
channel.basicNack(deliveryTag, false, true);
System.out.println("没有签收了货物..." + deliveryTag);
}
} catch (IOException e) {
// 网络中断会造成异常
e.printStackTrace();
}
}
本作品采用 知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议 (CC BY-NC-ND 4.0) 进行许可。