异步解耦削峰利器MQ的系列教程文档,包括但不限于:RabbitMq,Kafaka,RocketMQ等消息队列的使用及进阶博文
SpringBoot消息队列系列教程
- 1: RabbitMQ
- 1.1: 1.springboot + rabbitmq初体验
- 1.2: 2.RabbitMq核心知识点小结
- 1.3: 3.发送消息基本使用姿势
- 1.4: 4.消息确认机制/事务的使用姿势
- 1.5: 5.RabbitListener消费基本使用姿势
1 - RabbitMQ
消息队列RabbitMQ的基础 & 进阶教程,系列文章教你在生产中使用消息队列的各种姿势
1.1 - 1.springboot + rabbitmq初体验
mq在异步解耦削峰的优势非常突出,现在很多的项目都会用到,掌握mq的知识点,了解如何顺畅的使用mq,可以说是一个必备的职业技能点了
接下来我们进入rabbitmq的学习过程
I. 环境准备
在测试之前,需要安装rabbitmq,下面分别给出mac + centos的安装教程
1. mac 安装
安装命令
brew install rabbitmq
## 进入安装目录
cd /usr/local/Cellar/rabbitmq/3.7.5
# 当前窗口启动
sbin/rabbitmq-server
# 后台启动
brew services start rabbitmq
启动控制台之前需要先开启插件
sbin/rabbitmq-plugins enable rabbitmq_management
进入控制台: http://localhost:15672/
用户名和密码:guest,guest
2. centos 安装
安装命令
yum install erlang
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el6.noarch.rpm
yum install rabbitmq-server-3.6.15-1.el6.noarch.rpm
插件开启
rabbitmq-plugins enable rabbitmq_management
# 启动
rabbitmq-server -detached
3. 配置
添加账号,设置权限
## 添加账号
./rabbitmqctl add_user admin admin
## 添加访问权限
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 设置超级权限
./rabbitmqctl set_user_tags admin administrator
4. 项目环境
接下我们创建一个SpringBoot项目,用于简单的体验一下rabbitmq的发布和消费消息
- springboot版本为
2.2.1.RELEASE
- rabbitmq 版本为
3.7.5
依赖配置文件pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</pluginManagement>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
在application.yml
配置文件中,添加rabbitmq的相关属性
spring:
rabbitmq:
virtual-host: /
username: admin
password: admin
port: 5672
II. 实例演示
接下来我们看一个hello world
版本的rabbitmq的使用姿势,一个简单发布消息、消费消息
1. 发布消息
消息发布,我们主要借助AmqpTemplate
来实现
@Component
public class PublishDemo {
@Autowired
private AmqpTemplate amqpTemplate;
public String publish2mq(String ans) {
String msg = "hello world = " + ans;
System.out.println("publish: " + msg);
amqpTemplate.convertAndSend(Pkg.exchange, Pkg.routing, msg);
return msg;
}
}
上面的case中,主要方法在于amqpTemplate#convertAndSend
,第一个参数为exchangeName, 第二个为routingKey
常量配置如下
class Pkg {
final static String exchange = "topic.e";
final static String routing = "r";
final static String queue = "topic.a";
}
2. 消费消息
消费消息,需要指定Queue,通过routingKey绑定exchange,如下
@Service
public class ConsumerDemo {
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = Pkg.queue, durable = "false", autoDelete = "true"),
exchange = @Exchange(value = Pkg.exchange, ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC), key = Pkg.routing))
public void consumer(String msg) {
System.out.println("consumer msg: " + msg);
}
}
3. 测试demo
写一个简单的rest接口,用于接收参数,发布消息到mq,并被ConsumerDemo
消费
@RestController
public class PubRest {
@Autowired
private PublishDemo publishDemo;
@GetMapping(path = {"", "/", "/publish"})
public String publish(String name) {
return publishDemo.publish2mq(name);
}
}
II. 其他
0. 项目
1.2 - 2.RabbitMq核心知识点小结
RabbitMQ是一个基于AMQP协议实现的企业级消息系统,想要顺畅的玩耍的前提是得先了解它,本文将主要介绍rabbitmq的一些基本知识点
- 特点
- 基本概念
- 消息投递消费的几种姿势
- 事务
- 集群
I. 基本知识点
它是采用Erlang语言实现的AMQP(Advanced Message Queued Protocol)的消息中间件,最初起源于金融系统,用在分布式系统存储转发消息,目前广泛应用于各类系统用于解耦、削峰
1.特点
首先得了解一下rabbitmq的特点,看看是否满足我们的系统需求(毕竟学习一个框架也是要不少时间的)
以下内容来自: MQ和RabbitMQ作用特点
主要特点,大致可以归纳为以下几个
- 可靠性:通过支持消息持久化,支持事务,支持消费和传输的ack等来确保可靠性
- 路由机制:支持主流的订阅消费模式,如广播,订阅,headers匹配等
- 扩展性:多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
- 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队仍然可用。
- 多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP,MQTT等多种消息中间件协议。
- 多语言客户端:RabbitMQ几乎支持所有常用语言,比如Jav a、Python、Ruby、PHP、C#、JavaScript等。
- 管理界面:RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
- 插件机制:RabbitMQ提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。
2. 基本概念
下图为rabbitmq的内部结构图
从上图也可以发现几个基本概念(Message, Publisher, Exchange, Binding, Queue, Channel, Consuer, Virtual host)
下面逐一进行说明
a. Message
具体的消息,包含消息头(即附属的配置信息)和消息体(即消息的实体内容)
由发布者,将消息推送到Exchange,由消费者从Queue中获取
b. Publisher
消息生产者,负责将消息发布到交换器(Exchange)
c. Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列
d. Binding
绑定,用于给Exchange和Queue建立关系,从而决定将这个交换器中的哪些消息,发送到对应的Queue
e. Queue
消息队列,用来保存消息直到发送给消费者
它是消息的容器,也是消息的终点
一个消息可投入一个或多个队列
消息一直在队列里面,等待消费者连接到这个队列将其取走
f. Connection
连接,内部持有一些channel,用于和queue打交道
g. Channel
信道(通道),MQ与外部打交道都是通过Channel来的,发布消息、订阅队列还是接收消息,这些动作都是通过Channel完成;
简单来说就是消息通过Channel塞进队列或者流出队列
h. Consumer
消费者,从消息队列中获取消息的主体
i. Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。
虚拟主机是共享相同的身份认证和加密环境的独立服务器域。
每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。
vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /
可以理解为db中的数据库的概念,用于逻辑拆分
j. Broker
消息队列服务器实体
3. 消息投递消费
从前面的内部结构图可以知晓,消息由生产者发布到Exchange,然后通过路由规则,分发到绑定queue上,供消费者获取消息
接下来我们看一下Exchange支持的四种策略
a. Direct策略
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中
简单来讲,就是rounting key
与binding key
完全匹配
- 如果一个队列绑定到交换机要求路由键为
dog
- 只转发
routing key
标记为dog
的消息, - 不会转发
dog.puppy
,也不会转发“dog.guard”等等 - 它是完全匹配、单播的模式
举例说明
Exchange和两个队列绑定在一起:
- Q1的bindingkey是orange
- Q2的binding key是black和green.
- 当Producer 发布一个消息,其
routing key
是orange
时, exchange会把它放到Q1上, 如果是black
或green
就会到Q2上, 其余的Message被丢弃
注意
- 当有多个队列绑定到同一个Exchange,且binding key相同时,这时消息会分发给所有满足条件的队列
b. Topic策略
这个策略可以看成是Direct策略的升级版,通过routing key
与 bingding key
的模式匹配方式来分发消息
简单来讲,直接策略是完全精确匹配,而topic则支持正则匹配,满足某类指定规则的(如以xxx开头的路由键),可以将消息分发过去
#
匹配0个或多个单词*
匹配不多不少一个单词
一个更直观的实例如下
Producer发送消息时需要设置routing_key,
- Q1 的binding key 是
*.orange.*
- Q2 是
*.*.rabbit
和lazy.#
: - 发布一个
routing key
为test.orange.mm
消息,则会路由到Q1;- 注意: 如果是
routng key
是test.orange
则无法路由到Q1, - 因为Q1的规则是三个单词,中间一个为orange,不满足这个规则的都无效
- 注意: 如果是
- 发布一个
routing key
为test.qq.rabbit
或者lazy.qq
的消息 都可以分发到Q2;即路由key为三个单词,最后一个为rabbit或者不限制单词个数,主要第一个是lazy的消息,都可以分发过来 - 如果发布的是一个
test.orange.rabbit
消息,则Q1和Q2都可以满足- 注意: 这时两个队列都会接受到这个消息
c. Fanout策略
广播策略,忽略routing key
和 binding key
,将消息分发给所有绑定在这个exchange上的queue
d. Headers策略
这个实际上用得不多,它是根据Message的一些头部信息来分发过滤Message,忽略routing key的属性,如果Header信息和message消息的头信息相匹配
II. 消息一致性问题
在进入rabbitmq如何保证一致性之前,我们先得理解,什么是消息一致性?
1. 一致性问题
按照我个人的粗浅理解,我认为的消息一致性,应该包含下面几个
- 生产者,确保消息发布成功
- 消息不会丢
- 顺序不会乱
- 消息不会重复(如重传,导致发布一次,却出现多个消息)
- 消费者,确保消息消费成功
- 有序消费
- 不重复消费
发送端
为了确保发布者推送的消息不会丢失,我们需要消息持久化
- broker持久化消息
为了确定消息正确接收
- publisher 需要知道消息投递并成功持久化
2. 持久化
这里的持久化,主要是指将内存中的消息保存到磁盘,避免mq宕机导致的内存中消息丢失;然而单纯的持久化,只是保证一致性的其中一个要素,比如publisher将消息发送到exchange,在broker持久化的工程中,宕机了导致持久化失败,而publisher并不知道持久化失败,这个时候就会出现数据丢失,为了解决这个问题,rabbitmq提供了事务机制
3. 事务机制
事务机制能够解决生产者与broker之间消息确认的问题,只有消息成功被broker接受,事务才能提交成功,否则就进行事务回滚操作并进行消息重发。但是使用事务机制会降低RabbitMQ的消息吞吐量,不适用于需要发布大量消息的业务场景。
注意,事务是同步的
4. 消息确认机制
消息确认机制,可以区分为生产端和消费端
生产端
- 生产者将信道设置成Confirm模式,一旦信道进入Confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(以confirm.select为基础从1开始计数),
- 一旦消息被投递到所有匹配的队列之后,Broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,
- 如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,
- Broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号(此外Broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理)
Confirm模式属性异步,publisher发布一条消息之后,在等信道返回确认的同时,依然可以继续发送下一条消息,所以小概率会出现投递的消息顺序和broker中持久化消息顺序不一致的问题
一般从编程角度出发,Confirm模式有三种姿势
- 普通Confirm模式:发送一条消息之后,等到服务器confirm,然后再发布下一条消息(串行发布)
- 批量Confirm模式:发送一批消息之后,等到服务器confirm,然后再发布下一批消息(如果失败,这一批消息全部重复,所以会有重复问题)
- 异步Confirm模式:提供一个回调方法,服务器confirm之后,触发回调方法,因此不会阻塞下一条消息的发送
消费端
ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。
- 如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中
- 如果在集群的情况下,RabbitMQ会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务
- 消息永远不会从RabbitMQ中删除,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除
III. 集群
按照目前的发展趋势,一个不支持集群的中间件基本上是不会有市场的;rabbitmq也是支持集群的,下面简单的介绍一下常见的4种集群架构模式
以下内容来自网上博文,详情请点击右边:RabbitMQ 的4种集群架构
1. 主备模式
这个属于常见的集群模式了,但又不太一样
主节点提供读写,备用节点不提供读写。如果主节点挂了,就切换到备用节点,原来的备用节点升级为主节点提供读写服务,当原来的主节点恢复运行后,原来的主节点就变成备用节点
2. 远程模式
远程模式可以实现双活的一种模式,简称 shovel 模式,所谓的 shovel 就是把消息进行不同数据中心的复制工作,可以跨地域的让两个 MQ 集群互联,远距离通信和复制。
- Shovel 就是我们可以把消息进行数据中心的复制工作,我们可以跨地域的让两个 MQ 集群互联。
如上图,有两个异地的 MQ 集群(可以是更多的集群),当用户在地区 1 这里下单了,系统发消息到 1 区的 MQ 服务器,发现 MQ 服务已超过设定的阈值,负载过高,这条消息就会被转到 地区 2 的 MQ 服务器上,由 2 区的去执行后面的业务逻辑,相当于分摊我们的服务压力。
3. 镜像模式
非常经典的 mirror 镜像模式,保证 100% 数据不丢失。在实际工作中也是用得最多的,并且实现非常的简单,一般互联网大厂都会构建这种镜像集群模式。
如上图,用 KeepAlived 做了 HA-Proxy 的高可用,然后有 3 个节点的 MQ 服务,消息发送到主节点上,主节点通过 mirror 队列把数据同步到其他的 MQ 节点,这样来实现其高可靠
4. 多活模式
也是实现异地数据复制的主流模式,因为 shovel 模式配置比较复杂,所以一般来说,实现异地集群的都是采用这种双活 或者 多活模型来实现的。这种模式需要依赖 rabbitMQ 的 federation 插件,可以实现持续的,可靠的 AMQP 数据通信,多活模式在实际配置与应用非常的简单
rabbitMQ 部署架构采用双中心模式(多中心),那么在两套(或多套)数据中心各部署一套 rabbitMQ 集群,各中心的rabbitMQ 服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享。
federation 插件是一个不需要构建 cluster ,而在 brokers 之间传输消息的高性能插件,federation 插件可以在 brokers 或者 cluster 之间传输消息,连接的双方可以使用不同的 users 和 virtual hosts,双方也可以使用不同版本的 rabbitMQ 和 erlang。federation 插件使用 AMQP 协议通信,可以接受不连续的传输。federation 不是建立在集群上的,而是建立在单个节点上的,如图上黄色的 rabbit node 3 可以与绿色的 node1、node2、node3 中的任意一个利用 federation 插件进行数据同步。
IV. 其他
0. 项目
1. 相关博文
- RabbitMQ Tutorials
- MQ和RabbitMQ作用特点
- RabbitMq基础教程之基本概念
- RabbitMQ学习(六)——消息确认机制(Confirm模式)
- RabbitMQ 的4种集群架构
- Rabbitmq是如何来保证事务的
- rabbitmq消息一致性问题
2. 一灰灰Blog
尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激
下面一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛
- 一灰灰Blog个人博客 https://blog.hhui.top
- 一灰灰Blog-Spring专题博客 http://spring.hhui.top
1.3 - 3.发送消息基本使用姿势
前面两篇博文,分别介绍了RabbitMq的核心知识点,以及整合SpringBoot的demo应用;接下来也该进入正题,看一下SpringBoot的环境下,如何玩转rabbitmq
本篇内容主要为消息发送,包括以下几点
RabbitTemplate
发送消息的基本使用姿势- 自定义消息基本属性
- 自定义消息转换器
AbstractMessageConverter
- 发送Object类型消息失败的case
I. 基本使用姿势
1. 配置
我们借助SpringBoot 2.2.1.RELEASE
+ rabbitmq 3.7.5
来完整项目搭建与测试
项目pom.xml如下
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件application.yml
内容如下
spring:
rabbitmq:
virtual-host: /
username: admin
password: admin
port: 5672
host: 127.0.0.1
2. 配置类
通过前面rabbitmq的知识点学习,我们可以知道发送端的主要逻辑 “将消息发送给exchange,然后根据不同的策略分发给对应的queue”
本篇博文主要讨论的是消息发送,为了后续的实例演示,我们定义一个topic模式的exchange,并绑定一个的queue;(因为对发送端而言,不同的exchange类型,对发送端的使用姿势影响并不大,有影响的是消费者)
public class MqConstants {
public static final String exchange = "topic.e";
public static final String routing = "r";
public final static String queue = "topic.a";
}
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(MqConstants.exchange);
}
@Bean
public Queue queue() {
// 创建一个持久化的队列
return new Queue(MqConstants.queue, true);
}
@Bean
public Binding binding(TopicExchange topicExchange, Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange).with(MqConstants.routing);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}
3. 消息发送
消息发送,主要借助的是RabbitTemplate#convertAndSend
方法来实现,通常情况下,我们直接使用即可
@Service
public class BasicPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 一般的用法,推送消息
*
* @param ans
* @return
*/
private String publish2mq1(String ans) {
String msg = "Durable msg = " + ans;
System.out.println("publish: " + msg);
rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
return msg;
}
}
上面的核心点就一行rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
- 表示将msg发送给指定的exchange,并设置消息的路由键
请注意
通过上面的方式,发送的消息默认是持久化的,当持久化的消息,分发到持久化的队列时,会有消息的落盘操作;
在某些场景下,我们对消息的完整性要求并没有那么严格,反而更在意mq的性能,丢失一些数据也可以接受;这个时候我们可能需要定制一下发送的消息属性(比如将消息设置为非持久化的)
下面提供两种姿势,推荐第二种
/**
* 推送一个非持久化的消息,这个消息推送到持久化的队列时,mq重启,这个消息会丢失;上面的持久化消息不会丢失
*
* @param ans
* @return
*/
private String publish2mq2(String ans) {
MessageProperties properties = new MessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
Message message = rabbitTemplate.getMessageConverter().toMessage("NonDurable = " + ans, properties);
rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, message);
System.out.println("publish: " + message);
return message.toString();
}
private String publish2mq3(String ans) {
String msg = "Define msg = " + ans;
rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("ta", "测试");
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
return message;
}
});
return msg;
}
注意
- 在实际的项目开发中,推荐使用
MessagePostProcessor
来定制消息属性 - 其次不推荐在每次发送消息时都创建一个
MessagePostProcessor
对象,请定义一个通用的对象,能复用就复用
4. 非序列化对象发送异常case
通过查看rabbitTemplate#convertAndSend
的接口定义,我们知道发送的消息可以是Object类型,那么是不是意味着任何对象,都可以推送给mq呢?
下面是一个测试case
private String publish2mq4(String ans) {
NonSerDO nonSerDO = new NonSerDO(18, ans);
System.out.println("publish: " + nonSerDO);
rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
return nonSerDO.toString();
}
@Data
public static class NonSerDO {
private Integer age;
private String name;
public NonSerDO(int age, String name) {
this.age = age;
this.name = name;
}
}
当我们调用上面的publish2mq4
方法时,并不会是想象中的直接成功,相反抛出一个参数类型异常
为什么会出现这个问题呢?从堆栈分析,我们知道RabbitTemplate默认是利用SimpleMessageConverter
来实现封装Message逻辑的,核心代码为
// 下面代码来自 org.springframework.amqp.support.converter.SimpleMessageConverter#createMessage
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
byte[] bytes = null;
if (object instanceof byte[]) {
bytes = (byte[]) object;
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
}
else if (object instanceof String) {
try {
bytes = ((String) object).getBytes(this.defaultCharset);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"failed to convert to Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
messageProperties.setContentEncoding(this.defaultCharset);
}
else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
}
catch (IllegalArgumentException e) {
throw new MessageConversionException(
"failed to convert to serialized Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
}
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
return new Message(bytes, messageProperties);
}
throw new IllegalArgumentException(getClass().getSimpleName()
+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}
上面逻辑很明确的指出了,只接受byte数组,string字符串,可序列化对象(这里使用的是jdk的序列化方式来实现对象和byte数组之间的互转)
- 所以我们传递一个非序列化的对象会参数非法的异常
自然而然的,我们会想有没有其他的MessageConverter
来友好的支持任何类型的对象
5. 自定义MessageConverter
接下来我们希望通过自定义一个json序列化方式的MessageConverter来解决上面的问题
一个比较简单的实现(利用FastJson来实现序列化/反序列化)
public static class SelfConverter extends AbstractMessageConverter {
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) {
messageProperties.setContentType("application/json");
return new Message(JSON.toJSONBytes(object), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return JSON.parse(message.getBody());
}
}
重新定义一个rabbitTemplate
,并设置它的消息转换器为自定义的SelfConverter
@Bean
public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new SelfConverter());
return rabbitTemplate;
}
然后再次测试一下
@Service
public class JsonPublisher {
@Autowired
private RabbitTemplate jsonRabbitTemplate;
private String publish1(String ans) {
Map<String, Object> msg = new HashMap<>(8);
msg.put("msg", ans);
msg.put("type", "json");
msg.put("version", 123);
System.out.println("publish: " + msg);
jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
return msg.toString();
}
private String publish2(String ans) {
BasicPublisher.NonSerDO nonSerDO = new BasicPublisher.NonSerDO(18, "SELF_JSON" + ans);
System.out.println("publish: " + nonSerDO);
jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
return nonSerDO.toString();
}
}
mq内接收到的推送消息如下
6. Jackson2JsonMessageConverter
上面虽然实现了Json格式的消息转换,但是比较简陋;而且这么基础通用的功能,按照Spring全家桶的一贯作风,肯定是有现成可用的,没错,这就是Jackson2JsonMessageConverter
所以我们的使用姿势也可以如下
//定义RabbitTemplate
@Bean
public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
// 测试代码
@Autowired
private RabbitTemplate jacksonRabbitTemplate;
private String publish3(String ans) {
Map<String, Object> msg = new HashMap<>(8);
msg.put("msg", ans);
msg.put("type", "jackson");
msg.put("version", 456);
System.out.println("publish: " + msg);
jacksonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
return msg.toString();
}
下面是通过Jackson序列化消息后的内容,与我们自定义的有一些不同,多了headers
和content_encoding
7. 小结
本篇博文主要的知识点如下
- 通过
RabbitTemplate#convertAndSend
来实现消息分发 - 通过
MessagePostProcessor
来自定义消息的属性(请注意默认投递的消息时持久化的) - 默认的消息封装类为
SimpleMessageConverter
,只支持分发byte数组,字符串和可序列化的对象;不满足上面三个条件的方法调用会抛异常 - 我们可以通过实现
MessageConverter
接口,来定义自己的消息封装类,解决上面的问题
在RabbitMq的知识点博文中,明确提到了,为了确保消息被brocker正确接收,提供了消息确认机制和事务机制两种case,那么如果需要使用这两种方式,消息生产者需要怎么做呢?
限于篇幅,下一篇博文将带来在消息确认机制/事务机制下的发送消息使用姿势
II. 其他
0. 系列博文&项目源码
系列博文
项目源码
1.4 - 4.消息确认机制/事务的使用姿势
上一篇介绍了RabbitMq借助RabbitTemplate来发送消息的基本使用姿势,我们知道RabbitMq提供了两种机制,来确保发送端的消息被brocke正确接收,本文将主要介绍,在消息确认和事物两种机制的场景下,发送消息的使用姿势
I. 配置
首先创建一个SpringBoot项目,用于后续的演示
- springboot版本为
2.2.1.RELEASE
- rabbitmq 版本为
3.7.5
(安装教程可参考: 【MQ系列】springboot + rabbitmq初体验)
依赖配置文件pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</pluginManagement>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
在application.yml
配置文件中,添加rabbitmq的相关属性
spring:
rabbitmq:
virtual-host: /
username: admin
password: admin
port: 5672
host: 127.0.0.1
II. 消息确认机制
本节来看一下消息确认机制的使用姿势,首先有必要了解一下什么是消息确认机制
1. 定义
简单来讲就是消息发送之后,需要接收到RabbitMq的正确反馈,然后才能判断消息是否正确发送成功;
一般来说,RabbitMq的业务逻辑包括以下几点
- 生产者将信道设置成Confirm模式,一旦信道进入Confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(以confirm.select为基础从1开始计数)
- 一旦消息被投递到所有匹配的队列之后,Broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了
- 如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出
- Broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号(此外Broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理)
2. 基本使用case
从上面的解释,可以知道发送消息端,需要先将信道设置为Confirm模式,RabbitProperties
配置类中,有个属性,正好是用来设置的这个参数的,所以我们可以直接在配置文件application.yml
中,添加下面的配置
spring:
rabbitmq:
# 在2.2.1.release版本中,下面这个配置属于删除状态,推荐使用后一种配置方式
# publisher-confirms: true
publisher-confirm-type: correlated
# 下面这个配置,表示接收mq返回的确认消息
publisher-returns: true
上面配置完毕之后,直接使用RabbitTemplate发送消息,表示已经支持Confirm模式了,但实际的使用,会有一点点区别,我们需要接收mq返回的消息,发送失败的回调(以实现重试逻辑等),所以一个典型的发送端代码可以如下
@Service
public class AckPublisher implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
/**
* 接收发送后确认信息
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("ack send succeed: " + correlationData);
} else {
System.out.println("ack send failed: " + correlationData + "|" + cause);
}
}
/**
* 发送失败的回调
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("ack " + message + " 发送失败");
}
/**
* 一般的用法,推送消息
*
* @param ans
* @return
*/
public String publish(String ans) {
String msg = "ack msg = " + ans;
System.out.println("publish: " + msg);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, correlationData);
return msg;
}
}
请注意上面的实现,首先需要给RabbitTemplate设置回调,这两个不可或缺
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
3. 手动配置方式
上面利用的是标准的SpringBoot配置,一般来说是适用于绝大多数的场景的;当不能覆盖的时候,还可以通过手动的方式来定义一个特定的RabbitTemplate(比如一个项目中,只有某一个场景的消息发送需要确认机制,其他的默认即可,所以需要区分RabbitTemplate)
在自动配置类中,可以手动的注册一个RabbitTemplate的bean,来专职消息确认模式的发送
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private Integer port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Bean
public RabbitTemplate ackRabbitTemplate() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
// 设置ack为true
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return new RabbitTemplate(connectionFactory);
}
至于使用姿势,和前面完全一致,只是将rabbitTemplate
换成ackRabbitTemplate
III. 事务机制
消息确认机制属于异步模式,也就是说一个消息发送完毕之后,不待返回,就可以发送另外一条消息;这里就会有一个问题,publisher先后发送msg1, msg2,但是对RabbitMq而言,接收的顺序可能是msg2, msg1;所以消息的顺序可能会不一致
所以有了更加严格的事务机制,它属于同步模式,发送消息之后,等到接收到确认返回之后,才能发送下一条消息
1. 事务使用方式
首先我们定义一个事务管理器
/**
* 配置rabbitmq事务
*
* @param connectionFactory
* @return
*/
@Bean("rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
事务机制的使用姿势,看起来和上面的消息确认差不多,无非是需要添加一个@Transactional
注解罢了
@Service
public class TransactionPublisher implements RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate transactionRabbitTemplate;
@PostConstruct
public void init() {
// 将信道设置为事务模式
transactionRabbitTemplate.setChannelTransacted(true);
transactionRabbitTemplate.setReturnCallback(this);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("事务 " + message + " 发送失败");
}
/**
* 一般的用法,推送消息
*
* @param ans
* @return
*/
@Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")
public String publish(String ans) {
String msg = "transaction msg = " + ans;
System.out.println("publish: " + msg);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
transactionRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, correlationData);
return msg;
}
}
请注意,核心代码设置信道为事务模式必不可少
// 将信道设置为事务模式
transactionRabbitTemplate.setChannelTransacted(true);
IV. 测试
我们这里主要测试一下事务和消息确认机制的性能对比吧,从定义上来看消息确认机制效率更高,我们简单的对比一下
@RestController
public class PubRest {
@Autowired
private AckPublisher ackPublisher;
@Autowired
private TransactionPublisher transactionPublisher;
private AtomicInteger atomicInteger = new AtomicInteger(1);
@GetMapping(path = "judge")
public boolean judge(String name) {
for (int i = 0; i < 10; i++) {
long start = System.currentTimeMillis();
ackPublisher.publish(name + atomicInteger.getAndIncrement());
ackPublisher.publish(name + atomicInteger.getAndIncrement());
ackPublisher.publish(name + atomicInteger.getAndIncrement());
long mid = System.currentTimeMillis();
System.out.println("ack cost: " + (mid - start));
transactionPublisher.publish(name + atomicInteger.getAndIncrement());
transactionPublisher.publish(name + atomicInteger.getAndIncrement());
transactionPublisher.publish(name + atomicInteger.getAndIncrement());
System.out.println("transaction cost: " + (System.currentTimeMillis() - mid));
}
return true;
}
}
去掉无关的输出,仅保留耗时,对比如下(差距还是很明显的)
ack cost: 5
transaction cost: 111
ack cost: 3
transaction cost: 108
ack cost: 2
transaction cost: 101
ack cost: 3
transaction cost: 107
ack cost: 14
transaction cost: 106
ack cost: 2
transaction cost: 140
ack cost: 4
transaction cost: 124
ack cost: 4
transaction cost: 131
ack cost: 4
transaction cost: 129
ack cost: 2
transaction cost: 99
V. 其他
系列博文
项目源码
1.5 - 5.RabbitListener消费基本使用姿势
之前介绍了rabbitmq的消息发送姿势,既然有发送,当然就得有消费者,在SpringBoot环境下,消费可以说比较简单了,借助@RabbitListener
注解,基本上可以满足你90%以上的业务开发需求
下面我们来看一下@RabbitListener
的最最常用使用姿势
I. 配置
首先创建一个SpringBoot项目,用于后续的演示
- springboot版本为
2.2.1.RELEASE
- rabbitmq 版本为
3.7.5
(安装教程可参考: 【MQ系列】springboot + rabbitmq初体验)
依赖配置文件pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 注意,下面这个不是必要的哦-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</pluginManagement>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
在application.yml
配置文件中,添加rabbitmq的相关属性
spring:
rabbitmq:
virtual-host: /
username: admin
password: admin
port: 5672
host: 127.0.0.1
II. 消费姿势
本文将目标放在实用性上,将结合具体的场景来演示@RabbitListener
的使用姿势,因此当你发现看完本文之后这个注解里面有些属性还是不懂,请不要着急,下一篇会一一道来
0. mock数据
消费消费,没有数据,怎么消费呢?所以我们第一步,先创建一个消息生产者,可以往exchange写数据,供后续的消费者测试使用
本篇的消费主要以topic模式来进行说明(其他的几个模式使用差别不大,如果有需求的话,后续补齐)
@RestController
public class PublishRest {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(path = "publish")
public boolean publish(String exchange, String routing, String data) {
rabbitTemplate.convertAndSend(exchange, routing, data);
return true;
}
}
提供一个简单rest接口,可以指定往哪个exchange推送数据,并制定路由键
1. case1: exchange, queue已存在
对于消费者而言其实是不需要管理exchange的创建/销毁的,它是由发送者定义的;一般来讲,消费者更关注的是自己的queue,包括定义queue并与exchange绑定,而这一套过程是可以直接通过rabbitmq的控制台操作的哦
所以实际开发过程中,exchange和queue以及对应的绑定关系已经存在的可能性是很高的,并不需要再代码中额外处理;
在这种场景下,消费数据,可以说非常非常简单了,如下:
/**
* 当队列已经存在时,直接指定队列名的方式消费
*
* @param data
*/
@RabbitListener(queues = "topic.a")
public void consumerExistsQueue(String data) {
System.out.println("consumerExistsQueue: " + data);
}
直接指定注解中的queues
参数即可,参数值为对列名(queueName)
2. case2: queue不存在
当queue的autoDelete属性为false时,上面的使用场景还是比较合适了;但是,当这个属性为true时,没有消费者队列就会自动删除了,这个时候再用上面的姿势,可能会得到下面的异常
通常这种场景下,是需要我们来主动创建Queue,并建立与Exchange的绑定关系,下面给出@RabbitListener
的推荐使用姿势
/**
* 队列不存在时,需要创建一个队列,并且与exchange绑定
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "topic.n1", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC),
key = "r"))
public void consumerNoQueue(String data) {
System.out.println("consumerNoQueue: " + data);
}
一个注解,内部声明了队列,并建立绑定关系,就是这么神奇!!!
注意@QueueBinding
注解的三个属性:
- value: @Queue注解,用于声明队列,value为queueName, durable表示队列是否持久化, autoDelete表示没有消费者之后队列是否自动删除
- exchange: @Exchange注解,用于声明exchange, type指定消息投递策略,我们这里用的topic方式
- key: 在topic方式下,这个就是我们熟知的 routingKey
以上,就是在队列不存在时的使用姿势,看起来也不复杂
3. case3: ack
在前面rabbitmq的核心知识点学习过程中,会知道为了保证数据的一致性,有一个消息确认机制;
我们这里的ack主要是针对消费端而言,当我们希望更改默认ack方式(noack, auto, manual),可以如下处理
/**
* 需要手动ack,但是不ack时
*
* @param data
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n2", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), ackMode = "MANUAL")
public void consumerNoAck(String data) {
// 要求手动ack,这里不ack,会怎样?
System.out.println("consumerNoAck: " + data);
}
上面的实现也比较简单,设置ackMode=MANUAL
,手动ack
但是,请注意我们的实现中,没有任何一个地方体现了手动ack,这就相当于一致都没有ack,在后面的测试中,可以看出这种不ack时,会发现数据一直在unacked
这一栏,当Unacked数量超过限制的时候,就不会再消费新的数据了
4. case4: manual ack
上面虽然选择ack方式,但是还缺一步ack的逻辑,接下来我们看一下如何补齐
/**
* 手动ack
*
* @param data
* @param deliveryTag
* @param channel
* @throws IOException
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n3", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), ackMode = "MANUAL")
public void consumerDoAck(String data, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
throws IOException {
System.out.println("consumerDoAck: " + data);
if (data.contains("success")) {
// RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费
channel.basicAck(deliveryTag, false);
} else {
// 第三个参数true,表示这个消息会重新进入队列
channel.basicNack(deliveryTag, false, true);
}
}
请注意,方法多了两个参数
deliveryTag
: 相当于消息的唯一标识,用于mq辨别是哪个消息被ack/nak了channel
: mq和consumer之间的管道,通过它来ack/nak
当我们正确消费时,通过调用 basicAck
方法即可
// RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费
channel.basicAck(deliveryTag, false);
当我们消费失败,需要将消息重新塞入队列,等待重新消费时,可以使用 basicNack
// 第三个参数true,表示这个消息会重新进入队列
channel.basicNack(deliveryTag, false, true);
5. case5: 并发消费
当消息很多,一个消费者吭哧吭哧的消费太慢,但是我的机器性能又杠杠的,这个时候我就希望并行消费,相当于同时有多个消费者来处理数据
要支持并行消费,如下设置即可
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n4", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), concurrency = "4")
public void multiConsumer(String data) {
System.out.println("multiConsumer: " + data);
}
请注意注解中的concurrency = "4"
属性,表示固定4个消费者;
除了上面这种赋值方式之外,还有一种 m-n
的格式,表示m个并行消费者,最多可以有n个
(额外说明:这个参数的解释实在SimpleMessageListenerContainer
的场景下的,下一篇文章会介绍它与DirectMessageListenerContainer
的区别)
6. 测试
通过前面预留的消息发送接口,我们在浏览器中请求: http://localhost:8080/publish?exchange=topic.e&routing=r&data=wahaha
然后看一下输出,五个消费者都接收到了,特别是主动nak的那个消费者,一直在接收到消息;
(因为一直打印日志,所以重启一下应用,开始下一个测试)
然后再发送一条成功的消息,验证下手动真确ack,是否还会出现上面的情况,请求命令: http://localhost:8080/publish?exchange=topic.e&routing=r&data=successMsg
然后再关注一下,没有ack的那个队列,一直有一个unack的消息
II. 其他
系列博文
- 【MQ系列】springboot + rabbitmq初体验
- 【MQ系列】RabbitMq核心知识点小结
- 【MQ系列】SprigBoot + RabbitMq发送消息基本使用姿势
- 【MQ系列】RabbitMq消息确认/事务机制的使用姿势
项目源码
- 工程:https://github.com/liuyueyi/spring-boot-demo
- 源码:https://github.com/liuyueyi/spring-boot-demo/tree/master/spring-boot/302-rabbitmq-consumer
1. 一灰灰Blog: https://liuyueyi.github.io/hexblog
一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛
2. 声明
尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激
- 微博地址: 小灰灰Blog
- QQ: 一灰灰/3302797840
3. 扫描关注
一灰灰blog