这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

SpringBoot消息队列系列教程

异步解耦削峰利器MQ的系列教程文档,包括但不限于:RabbitMq,Kafaka,RocketMQ等消息队列的使用及进阶博文

1 - RabbitMQ

消息队列RabbitMQ的基础 & 进阶教程,系列文章教你在生产中使用消息队列的各种姿势

1.1 - 1.springboot + rabbitmq初体验

mq在异步解耦削峰的优势非常突出,现在很多的项目都会用到,掌握mq的知识点,了解如何顺畅的使用mq,可以说是一个必备的职业技能点了

接下来我们进入rabbitmq的学习过程

I. 环境准备

在测试之前,需要安装rabbitmq,下面分别给出mac + centos的安装教程

1. mac 安装

安装命令

brew install rabbitmq

## 进入安装目录
cd /usr/local/Cellar/rabbitmq/3.7.5

# 当前窗口启动
sbin/rabbitmq-server

# 后台启动
brew services start rabbitmq

启动控制台之前需要先开启插件

sbin/rabbitmq-plugins enable rabbitmq_management

进入控制台: http://localhost:15672/

用户名和密码:guest,guest

2. centos 安装

安装命令

yum install erlang
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el6.noarch.rpm
yum install rabbitmq-server-3.6.15-1.el6.noarch.rpm

插件开启

rabbitmq-plugins enable rabbitmq_management
# 启动
rabbitmq-server -detached

3. 配置

添加账号,设置权限

## 添加账号
./rabbitmqctl add_user admin admin
## 添加访问权限
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 设置超级权限
./rabbitmqctl set_user_tags admin administrator

4. 项目环境

接下我们创建一个SpringBoot项目,用于简单的体验一下rabbitmq的发布和消费消息

  • springboot版本为2.2.1.RELEASE
  • rabbitmq 版本为 3.7.5

依赖配置文件pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.1.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

<build>
    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </pluginManagement>
</build>
<repositories>
    <repository>
        <id>spring-snapshots</id>
        <name>Spring Snapshots</name>
        <url>https://repo.spring.io/libs-snapshot-local</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/libs-milestone-local</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-releases</id>
        <name>Spring Releases</name>
        <url>https://repo.spring.io/libs-release-local</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>

application.yml配置文件中,添加rabbitmq的相关属性

spring:
  rabbitmq:
    virtual-host: /
    username: admin
    password: admin
    port: 5672

II. 实例演示

接下来我们看一个hello world版本的rabbitmq的使用姿势,一个简单发布消息、消费消息

1. 发布消息

消息发布,我们主要借助AmqpTemplate来实现

@Component
public class PublishDemo {
    @Autowired
    private AmqpTemplate amqpTemplate;

    public String publish2mq(String ans) {
        String msg = "hello world = " + ans;
        System.out.println("publish: " + msg);
        amqpTemplate.convertAndSend(Pkg.exchange, Pkg.routing, msg);
        return msg;
    }
}

上面的case中,主要方法在于amqpTemplate#convertAndSend,第一个参数为exchangeName, 第二个为routingKey

常量配置如下

class Pkg {
    final static String exchange = "topic.e";
    final static String routing = "r";
    final static String queue = "topic.a";
}

2. 消费消息

消费消息,需要指定Queue,通过routingKey绑定exchange,如下

@Service
public class ConsumerDemo {

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = Pkg.queue, durable = "false", autoDelete = "true"),
            exchange = @Exchange(value = Pkg.exchange, ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC), key = Pkg.routing))
    public void consumer(String msg) {
        System.out.println("consumer msg: " + msg);
    }
}

3. 测试demo

写一个简单的rest接口,用于接收参数,发布消息到mq,并被ConsumerDemo消费

@RestController
public class PubRest {
    @Autowired
    private PublishDemo publishDemo;

    @GetMapping(path = {"", "/", "/publish"})
    public String publish(String name) {
        return publishDemo.publish2mq(name);
    }
}

II. 其他

0. 项目

1.2 - 2.RabbitMq核心知识点小结

RabbitMQ是一个基于AMQP协议实现的企业级消息系统,想要顺畅的玩耍的前提是得先了解它,本文将主要介绍rabbitmq的一些基本知识点

  • 特点
  • 基本概念
  • 消息投递消费的几种姿势
  • 事务
  • 集群

I. 基本知识点

它是采用Erlang语言实现的AMQP(Advanced Message Queued Protocol)的消息中间件,最初起源于金融系统,用在分布式系统存储转发消息,目前广泛应用于各类系统用于解耦、削峰

1.特点

首先得了解一下rabbitmq的特点,看看是否满足我们的系统需求(毕竟学习一个框架也是要不少时间的)

以下内容来自: MQ和RabbitMQ作用特点

主要特点,大致可以归纳为以下几个

  • 可靠性:通过支持消息持久化,支持事务,支持消费和传输的ack等来确保可靠性
  • 路由机制:支持主流的订阅消费模式,如广播,订阅,headers匹配等
  • 扩展性:多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
  • 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队仍然可用。
  • 多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP,MQTT等多种消息中间件协议。
  • 多语言客户端:RabbitMQ几乎支持所有常用语言,比如Jav a、Python、Ruby、PHP、C#、JavaScript等。
  • 管理界面:RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
  • 插件机制:RabbitMQ提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。

2. 基本概念

下图为rabbitmq的内部结构图

从上图也可以发现几个基本概念(Message, Publisher, Exchange, Binding, Queue, Channel, Consuer, Virtual host)

下面逐一进行说明

a. Message

具体的消息,包含消息头(即附属的配置信息)和消息体(即消息的实体内容)

由发布者,将消息推送到Exchange,由消费者从Queue中获取

b. Publisher

消息生产者,负责将消息发布到交换器(Exchange)

c. Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列

d. Binding

绑定,用于给Exchange和Queue建立关系,从而决定将这个交换器中的哪些消息,发送到对应的Queue

e. Queue

消息队列,用来保存消息直到发送给消费者

它是消息的容器,也是消息的终点

一个消息可投入一个或多个队列

消息一直在队列里面,等待消费者连接到这个队列将其取走

f. Connection

连接,内部持有一些channel,用于和queue打交道

g. Channel

信道(通道),MQ与外部打交道都是通过Channel来的,发布消息、订阅队列还是接收消息,这些动作都是通过Channel完成;

简单来说就是消息通过Channel塞进队列或者流出队列

h. Consumer

消费者,从消息队列中获取消息的主体

i. Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。

虚拟主机是共享相同的身份认证和加密环境的独立服务器域。

每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。

vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /

可以理解为db中的数据库的概念,用于逻辑拆分

j. Broker

消息队列服务器实体

3. 消息投递消费

从前面的内部结构图可以知晓,消息由生产者发布到Exchange,然后通过路由规则,分发到绑定queue上,供消费者获取消息

接下来我们看一下Exchange支持的四种策略

a. Direct策略

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中

简单来讲,就是rounting keybinding key完全匹配

  • 如果一个队列绑定到交换机要求路由键为dog
  • 只转发routing key 标记为dog的消息,
  • 不会转发dog.puppy,也不会转发“dog.guard”等等
  • 它是完全匹配、单播的模式

举例说明

Exchange和两个队列绑定在一起:

  • Q1的bindingkey是orange
  • Q2的binding key是black和green.
  • 当Producer 发布一个消息,其routing keyorange时, exchange会把它放到Q1上, 如果是blackgreen就会到Q2上, 其余的Message被丢弃

注意

  • 当有多个队列绑定到同一个Exchange,且binding key相同时,这时消息会分发给所有满足条件的队列

b. Topic策略

这个策略可以看成是Direct策略的升级版,通过routing keybingding key的模式匹配方式来分发消息

简单来讲,直接策略是完全精确匹配,而topic则支持正则匹配,满足某类指定规则的(如以xxx开头的路由键),可以将消息分发过去

  • # 匹配0个或多个单词
  • * 匹配不多不少一个单词

一个更直观的实例如下

Producer发送消息时需要设置routing_key,

  • Q1 的binding key 是*.orange.*
  • Q2 是 *.*.rabbitlazy.#
  • 发布一个routing keytest.orange.mm 消息,则会路由到Q1;
    • 注意: 如果是routng keytest.orange则无法路由到Q1,
    • 因为Q1的规则是三个单词,中间一个为orange,不满足这个规则的都无效
  • 发布一个routing keytest.qq.rabbit或者lazy.qq的消息 都可以分发到Q2;即路由key为三个单词,最后一个为rabbit或者不限制单词个数,主要第一个是lazy的消息,都可以分发过来
  • 如果发布的是一个test.orange.rabbit消息,则Q1和Q2都可以满足
    • 注意: 这时两个队列都会接受到这个消息

c. Fanout策略

广播策略,忽略routing keybinding key,将消息分发给所有绑定在这个exchange上的queue

d. Headers策略

这个实际上用得不多,它是根据Message的一些头部信息来分发过滤Message,忽略routing key的属性,如果Header信息和message消息的头信息相匹配

II. 消息一致性问题

在进入rabbitmq如何保证一致性之前,我们先得理解,什么是消息一致性?

1. 一致性问题

数据的一致性是什么

按照我个人的粗浅理解,我认为的消息一致性,应该包含下面几个

  • 生产者,确保消息发布成功
    • 消息不会丢
    • 顺序不会乱
    • 消息不会重复(如重传,导致发布一次,却出现多个消息)
  • 消费者,确保消息消费成功
    • 有序消费
    • 不重复消费

发送端

为了确保发布者推送的消息不会丢失,我们需要消息持久化

  • broker持久化消息

为了确定消息正确接收

  • publisher 需要知道消息投递并成功持久化

2. 持久化

这里的持久化,主要是指将内存中的消息保存到磁盘,避免mq宕机导致的内存中消息丢失;然而单纯的持久化,只是保证一致性的其中一个要素,比如publisher将消息发送到exchange,在broker持久化的工程中,宕机了导致持久化失败,而publisher并不知道持久化失败,这个时候就会出现数据丢失,为了解决这个问题,rabbitmq提供了事务机制

3. 事务机制

事务机制能够解决生产者与broker之间消息确认的问题,只有消息成功被broker接受,事务才能提交成功,否则就进行事务回滚操作并进行消息重发。但是使用事务机制会降低RabbitMQ的消息吞吐量,不适用于需要发布大量消息的业务场景。

注意,事务是同步的

4. 消息确认机制

RabbitMQ学习(六)——消息确认机制(Confirm模式)

消息确认机制,可以区分为生产端和消费端

生产端

  • 生产者将信道设置成Confirm模式,一旦信道进入Confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(以confirm.select为基础从1开始计数),
  • 一旦消息被投递到所有匹配的队列之后,Broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,
  • 如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,
  • Broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号(此外Broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理)

Confirm模式属性异步,publisher发布一条消息之后,在等信道返回确认的同时,依然可以继续发送下一条消息,所以小概率会出现投递的消息顺序和broker中持久化消息顺序不一致的问题

一般从编程角度出发,Confirm模式有三种姿势

  • 普通Confirm模式:发送一条消息之后,等到服务器confirm,然后再发布下一条消息(串行发布)
  • 批量Confirm模式:发送一批消息之后,等到服务器confirm,然后再发布下一批消息(如果失败,这一批消息全部重复,所以会有重复问题)
  • 异步Confirm模式:提供一个回调方法,服务器confirm之后,触发回调方法,因此不会阻塞下一条消息的发送

消费端

ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。

  • 如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中
  • 如果在集群的情况下,RabbitMQ会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务
  • 消息永远不会从RabbitMQ中删除,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除

III. 集群

按照目前的发展趋势,一个不支持集群的中间件基本上是不会有市场的;rabbitmq也是支持集群的,下面简单的介绍一下常见的4种集群架构模式

以下内容来自网上博文,详情请点击右边:RabbitMQ 的4种集群架构

1. 主备模式

这个属于常见的集群模式了,但又不太一样

主节点提供读写,备用节点不提供读写。如果主节点挂了,就切换到备用节点,原来的备用节点升级为主节点提供读写服务,当原来的主节点恢复运行后,原来的主节点就变成备用节点

2. 远程模式

远程模式可以实现双活的一种模式,简称 shovel 模式,所谓的 shovel 就是把消息进行不同数据中心的复制工作,可以跨地域的让两个 MQ 集群互联,远距离通信和复制。

  • Shovel 就是我们可以把消息进行数据中心的复制工作,我们可以跨地域的让两个 MQ 集群互联。

如上图,有两个异地的 MQ 集群(可以是更多的集群),当用户在地区 1 这里下单了,系统发消息到 1 区的 MQ 服务器,发现 MQ 服务已超过设定的阈值,负载过高,这条消息就会被转到 地区 2 的 MQ 服务器上,由 2 区的去执行后面的业务逻辑,相当于分摊我们的服务压力。

3. 镜像模式

非常经典的 mirror 镜像模式,保证 100% 数据不丢失。在实际工作中也是用得最多的,并且实现非常的简单,一般互联网大厂都会构建这种镜像集群模式。

如上图,用 KeepAlived 做了 HA-Proxy 的高可用,然后有 3 个节点的 MQ 服务,消息发送到主节点上,主节点通过 mirror 队列把数据同步到其他的 MQ 节点,这样来实现其高可靠

4. 多活模式

也是实现异地数据复制的主流模式,因为 shovel 模式配置比较复杂,所以一般来说,实现异地集群的都是采用这种双活 或者 多活模型来实现的。这种模式需要依赖 rabbitMQ 的 federation 插件,可以实现持续的,可靠的 AMQP 数据通信,多活模式在实际配置与应用非常的简单

rabbitMQ 部署架构采用双中心模式(多中心),那么在两套(或多套)数据中心各部署一套 rabbitMQ 集群,各中心的rabbitMQ 服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享。

federation 插件是一个不需要构建 cluster ,而在 brokers 之间传输消息的高性能插件,federation 插件可以在 brokers 或者 cluster 之间传输消息,连接的双方可以使用不同的 users 和 virtual hosts,双方也可以使用不同版本的 rabbitMQ 和 erlang。federation 插件使用 AMQP 协议通信,可以接受不连续的传输。federation 不是建立在集群上的,而是建立在单个节点上的,如图上黄色的 rabbit node 3 可以与绿色的 node1、node2、node3 中的任意一个利用 federation 插件进行数据同步。

IV. 其他

0. 项目

1. 相关博文

2. 一灰灰Blog

尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

下面一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

一灰灰blog

1.3 - 3.发送消息基本使用姿势

前面两篇博文,分别介绍了RabbitMq的核心知识点,以及整合SpringBoot的demo应用;接下来也该进入正题,看一下SpringBoot的环境下,如何玩转rabbitmq

本篇内容主要为消息发送,包括以下几点

  • RabbitTemplate 发送消息的基本使用姿势
  • 自定义消息基本属性
  • 自定义消息转换器AbstractMessageConverter
  • 发送Object类型消息失败的case

I. 基本使用姿势

1. 配置

我们借助SpringBoot 2.2.1.RELEASE + rabbitmq 3.7.5来完整项目搭建与测试

项目pom.xml如下

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件application.yml内容如下

spring:
  rabbitmq:
    virtual-host: /
    username: admin
    password: admin
    port: 5672
    host: 127.0.0.1

2. 配置类

通过前面rabbitmq的知识点学习,我们可以知道发送端的主要逻辑 “将消息发送给exchange,然后根据不同的策略分发给对应的queue”

本篇博文主要讨论的是消息发送,为了后续的实例演示,我们定义一个topic模式的exchange,并绑定一个的queue;(因为对发送端而言,不同的exchange类型,对发送端的使用姿势影响并不大,有影响的是消费者)

public class MqConstants {

    public static final String exchange = "topic.e";

    public static final String routing = "r";

    public final static String queue = "topic.a";

}

@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(MqConstants.exchange);
    }

    @Bean
    public Queue queue() {
        // 创建一个持久化的队列
        return new Queue(MqConstants.queue, true);
    }

    @Bean
    public Binding binding(TopicExchange topicExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(topicExchange).with(MqConstants.routing);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
}

3. 消息发送

消息发送,主要借助的是RabbitTemplate#convertAndSend方法来实现,通常情况下,我们直接使用即可

@Service
public class BasicPublisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    /**
     * 一般的用法,推送消息
     *
     * @param ans
     * @return
     */
    private String publish2mq1(String ans) {
        String msg = "Durable msg = " + ans;
        System.out.println("publish: " + msg);
        rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
        return msg;
    }
}

上面的核心点就一行rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);

  • 表示将msg发送给指定的exchange,并设置消息的路由键

请注意

通过上面的方式,发送的消息默认是持久化的,当持久化的消息,分发到持久化的队列时,会有消息的落盘操作;

在某些场景下,我们对消息的完整性要求并没有那么严格,反而更在意mq的性能,丢失一些数据也可以接受;这个时候我们可能需要定制一下发送的消息属性(比如将消息设置为非持久化的)

下面提供两种姿势,推荐第二种

/**
 * 推送一个非持久化的消息,这个消息推送到持久化的队列时,mq重启,这个消息会丢失;上面的持久化消息不会丢失
 *
 * @param ans
 * @return
 */
private String publish2mq2(String ans) {
    MessageProperties properties = new MessageProperties();
    properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
    Message message = rabbitTemplate.getMessageConverter().toMessage("NonDurable = " + ans, properties);

    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, message);

    System.out.println("publish: " + message);
    return message.toString();
}


private String publish2mq3(String ans) {
    String msg = "Define msg = " + ans;
    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setHeader("ta", "测试");
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        }
    });

    return msg;
}

注意

  • 在实际的项目开发中,推荐使用MessagePostProcessor来定制消息属性
  • 其次不推荐在每次发送消息时都创建一个MessagePostProcessor对象,请定义一个通用的对象,能复用就复用

4. 非序列化对象发送异常case

通过查看rabbitTemplate#convertAndSend的接口定义,我们知道发送的消息可以是Object类型,那么是不是意味着任何对象,都可以推送给mq呢?

下面是一个测试case

private String publish2mq4(String ans) {
    NonSerDO nonSerDO = new NonSerDO(18, ans);
    System.out.println("publish: " + nonSerDO);
    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
    return nonSerDO.toString();
}


@Data
public static class NonSerDO {
    private Integer age;
    private String name;

    public NonSerDO(int age, String name) {
        this.age = age;
        this.name = name;
    }
}

当我们调用上面的publish2mq4方法时,并不会是想象中的直接成功,相反抛出一个参数类型异常

为什么会出现这个问题呢?从堆栈分析,我们知道RabbitTemplate默认是利用SimpleMessageConverter来实现封装Message逻辑的,核心代码为

// 下面代码来自 org.springframework.amqp.support.converter.SimpleMessageConverter#createMessage
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
	byte[] bytes = null;
	if (object instanceof byte[]) {
		bytes = (byte[]) object;
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
	}
	else if (object instanceof String) {
		try {
			bytes = ((String) object).getBytes(this.defaultCharset);
		}
		catch (UnsupportedEncodingException e) {
			throw new MessageConversionException(
					"failed to convert to Message content", e);
		}
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
		messageProperties.setContentEncoding(this.defaultCharset);
	}
	else if (object instanceof Serializable) {
		try {
			bytes = SerializationUtils.serialize(object);
		}
		catch (IllegalArgumentException e) {
			throw new MessageConversionException(
					"failed to convert to serialized Message content", e);
		}
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
	}
	if (bytes != null) {
		messageProperties.setContentLength(bytes.length);
		return new Message(bytes, messageProperties);
	}
	throw new IllegalArgumentException(getClass().getSimpleName()
			+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}

上面逻辑很明确的指出了,只接受byte数组,string字符串,可序列化对象(这里使用的是jdk的序列化方式来实现对象和byte数组之间的互转)

  • 所以我们传递一个非序列化的对象会参数非法的异常

自然而然的,我们会想有没有其他的MessageConverter来友好的支持任何类型的对象

5. 自定义MessageConverter

接下来我们希望通过自定义一个json序列化方式的MessageConverter来解决上面的问题

一个比较简单的实现(利用FastJson来实现序列化/反序列化)

public static class SelfConverter extends AbstractMessageConverter {
    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) {
        messageProperties.setContentType("application/json");
        return new Message(JSON.toJSONBytes(object), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return JSON.parse(message.getBody());
    }
}

重新定义一个rabbitTemplate,并设置它的消息转换器为自定义的SelfConverter

@Bean
public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new SelfConverter());
    return rabbitTemplate;
}

然后再次测试一下

@Service
public class JsonPublisher {
    @Autowired
    private RabbitTemplate jsonRabbitTemplate;
      
    private String publish1(String ans) {
        Map<String, Object> msg = new HashMap<>(8);
        msg.put("msg", ans);
        msg.put("type", "json");
        msg.put("version", 123);
        System.out.println("publish: " + msg);
        jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
        return msg.toString();
    }

    private String publish2(String ans) {
        BasicPublisher.NonSerDO nonSerDO = new BasicPublisher.NonSerDO(18, "SELF_JSON" + ans);
        System.out.println("publish: " + nonSerDO);
        jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
        return nonSerDO.toString();
    }
}

mq内接收到的推送消息如下

6. Jackson2JsonMessageConverter

上面虽然实现了Json格式的消息转换,但是比较简陋;而且这么基础通用的功能,按照Spring全家桶的一贯作风,肯定是有现成可用的,没错,这就是Jackson2JsonMessageConverter

所以我们的使用姿势也可以如下

//定义RabbitTemplate
@Bean
public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return rabbitTemplate;
}


// 测试代码
@Autowired
private RabbitTemplate jacksonRabbitTemplate;
private String publish3(String ans) {
    Map<String, Object> msg = new HashMap<>(8);
    msg.put("msg", ans);
    msg.put("type", "jackson");
    msg.put("version", 456);
    System.out.println("publish: " + msg);
    jacksonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
    return msg.toString();
}

下面是通过Jackson序列化消息后的内容,与我们自定义的有一些不同,多了headerscontent_encoding

7. 小结

本篇博文主要的知识点如下

  • 通过RabbitTemplate#convertAndSend来实现消息分发
  • 通过MessagePostProcessor来自定义消息的属性(请注意默认投递的消息时持久化的)
  • 默认的消息封装类为SimpleMessageConverter,只支持分发byte数组,字符串和可序列化的对象;不满足上面三个条件的方法调用会抛异常
  • 我们可以通过实现MessageConverter接口,来定义自己的消息封装类,解决上面的问题

在RabbitMq的知识点博文中,明确提到了,为了确保消息被brocker正确接收,提供了消息确认机制和事务机制两种case,那么如果需要使用这两种方式,消息生产者需要怎么做呢?

限于篇幅,下一篇博文将带来在消息确认机制/事务机制下的发送消息使用姿势

II. 其他

0. 系列博文&项目源码

系列博文

项目源码

1.4 - 4.消息确认机制/事务的使用姿势

上一篇介绍了RabbitMq借助RabbitTemplate来发送消息的基本使用姿势,我们知道RabbitMq提供了两种机制,来确保发送端的消息被brocke正确接收,本文将主要介绍,在消息确认和事物两种机制的场景下,发送消息的使用姿势

I. 配置

首先创建一个SpringBoot项目,用于后续的演示

依赖配置文件pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.1.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

<build>
    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </pluginManagement>
</build>
<repositories>
    <repository>
        <id>spring-snapshots</id>
        <name>Spring Snapshots</name>
        <url>https://repo.spring.io/libs-snapshot-local</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/libs-milestone-local</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-releases</id>
        <name>Spring Releases</name>
        <url>https://repo.spring.io/libs-release-local</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>

application.yml配置文件中,添加rabbitmq的相关属性

spring:
  rabbitmq:
    virtual-host: /
    username: admin
    password: admin
    port: 5672
    host: 127.0.0.1

II. 消息确认机制

本节来看一下消息确认机制的使用姿势,首先有必要了解一下什么是消息确认机制

1. 定义

简单来讲就是消息发送之后,需要接收到RabbitMq的正确反馈,然后才能判断消息是否正确发送成功;

一般来说,RabbitMq的业务逻辑包括以下几点

  • 生产者将信道设置成Confirm模式,一旦信道进入Confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(以confirm.select为基础从1开始计数)
  • 一旦消息被投递到所有匹配的队列之后,Broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了
  • 如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出
  • Broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号(此外Broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理)

2. 基本使用case

从上面的解释,可以知道发送消息端,需要先将信道设置为Confirm模式,RabbitProperties配置类中,有个属性,正好是用来设置的这个参数的,所以我们可以直接在配置文件application.yml中,添加下面的配置

spring:
  rabbitmq:
    # 在2.2.1.release版本中,下面这个配置属于删除状态,推荐使用后一种配置方式
    # publisher-confirms: true
    publisher-confirm-type: correlated
    # 下面这个配置,表示接收mq返回的确认消息
    publisher-returns: true

上面配置完毕之后,直接使用RabbitTemplate发送消息,表示已经支持Confirm模式了,但实际的使用,会有一点点区别,我们需要接收mq返回的消息,发送失败的回调(以实现重试逻辑等),所以一个典型的发送端代码可以如下

@Service
public class AckPublisher implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 接收发送后确认信息
     *
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("ack send succeed: " + correlationData);
        } else {
            System.out.println("ack send failed: " + correlationData + "|" + cause);
        }
    }

    /**
     * 发送失败的回调
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("ack " + message + " 发送失败");
    }


    /**
     * 一般的用法,推送消息
     *
     * @param ans
     * @return
     */
    public String publish(String ans) {
        String msg = "ack msg = " + ans;
        System.out.println("publish: " + msg);

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, correlationData);
        return msg;
    }
}

请注意上面的实现,首先需要给RabbitTemplate设置回调,这两个不可或缺

rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);

3. 手动配置方式

上面利用的是标准的SpringBoot配置,一般来说是适用于绝大多数的场景的;当不能覆盖的时候,还可以通过手动的方式来定义一个特定的RabbitTemplate(比如一个项目中,只有某一个场景的消息发送需要确认机制,其他的默认即可,所以需要区分RabbitTemplate)

在自动配置类中,可以手动的注册一个RabbitTemplate的bean,来专职消息确认模式的发送

@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private Integer port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;

@Bean
public RabbitTemplate ackRabbitTemplate() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost(host);
    connectionFactory.setPort(port);
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(virtualHost);
    // 设置ack为true
    connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
    connectionFactory.setPublisherReturns(true);
    return new RabbitTemplate(connectionFactory);
}

至于使用姿势,和前面完全一致,只是将rabbitTemplate换成ackRabbitTemplate

III. 事务机制

消息确认机制属于异步模式,也就是说一个消息发送完毕之后,不待返回,就可以发送另外一条消息;这里就会有一个问题,publisher先后发送msg1, msg2,但是对RabbitMq而言,接收的顺序可能是msg2, msg1;所以消息的顺序可能会不一致

所以有了更加严格的事务机制,它属于同步模式,发送消息之后,等到接收到确认返回之后,才能发送下一条消息

1. 事务使用方式

首先我们定义一个事务管理器

/**
 * 配置rabbitmq事务
 *
 * @param connectionFactory
 * @return
 */
@Bean("rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}

@Bean
public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {
    return new RabbitTemplate(connectionFactory);
}

事务机制的使用姿势,看起来和上面的消息确认差不多,无非是需要添加一个@Transactional注解罢了

@Service
public class TransactionPublisher implements RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate transactionRabbitTemplate;

    @PostConstruct
    public void init() {
        // 将信道设置为事务模式
        transactionRabbitTemplate.setChannelTransacted(true);
        transactionRabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("事务 " + message + " 发送失败");
    }

    /**
     * 一般的用法,推送消息
     *
     * @param ans
     * @return
     */
    @Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")
    public String publish(String ans) {
        String msg = "transaction msg = " + ans;
        System.out.println("publish: " + msg);

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        transactionRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, correlationData);
        return msg;
    }
}

请注意,核心代码设置信道为事务模式必不可少

// 将信道设置为事务模式
transactionRabbitTemplate.setChannelTransacted(true);

IV. 测试

我们这里主要测试一下事务和消息确认机制的性能对比吧,从定义上来看消息确认机制效率更高,我们简单的对比一下

@RestController
public class PubRest {
    @Autowired
    private AckPublisher ackPublisher;
    @Autowired
    private TransactionPublisher transactionPublisher;

    private AtomicInteger atomicInteger = new AtomicInteger(1);

    @GetMapping(path = "judge")
    public boolean judge(String name) {
        for (int i = 0; i < 10; i++) {
            long start = System.currentTimeMillis();
            ackPublisher.publish(name + atomicInteger.getAndIncrement());
            ackPublisher.publish(name + atomicInteger.getAndIncrement());
            ackPublisher.publish(name + atomicInteger.getAndIncrement());
            long mid = System.currentTimeMillis();
            System.out.println("ack cost: " + (mid - start));

            transactionPublisher.publish(name + atomicInteger.getAndIncrement());
            transactionPublisher.publish(name + atomicInteger.getAndIncrement());
            transactionPublisher.publish(name + atomicInteger.getAndIncrement());
            System.out.println("transaction cost: " + (System.currentTimeMillis() - mid));
        }
        return true;
    }
}

去掉无关的输出,仅保留耗时,对比如下(差距还是很明显的)

ack cost: 5
transaction cost: 111

ack cost: 3
transaction cost: 108

ack cost: 2
transaction cost: 101

ack cost: 3
transaction cost: 107

ack cost: 14
transaction cost: 106

ack cost: 2
transaction cost: 140

ack cost: 4
transaction cost: 124

ack cost: 4
transaction cost: 131

ack cost: 4
transaction cost: 129

ack cost: 2
transaction cost: 99

V. 其他

系列博文

项目源码

1.5 - 5.RabbitListener消费基本使用姿势

之前介绍了rabbitmq的消息发送姿势,既然有发送,当然就得有消费者,在SpringBoot环境下,消费可以说比较简单了,借助@RabbitListener注解,基本上可以满足你90%以上的业务开发需求

下面我们来看一下@RabbitListener的最最常用使用姿势

I. 配置

首先创建一个SpringBoot项目,用于后续的演示

依赖配置文件pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.1.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!-- 注意,下面这个不是必要的哦-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

<build>
    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </pluginManagement>
</build>
<repositories>
    <repository>
        <id>spring-snapshots</id>
        <name>Spring Snapshots</name>
        <url>https://repo.spring.io/libs-snapshot-local</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/libs-milestone-local</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-releases</id>
        <name>Spring Releases</name>
        <url>https://repo.spring.io/libs-release-local</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>

application.yml配置文件中,添加rabbitmq的相关属性

spring:
  rabbitmq:
    virtual-host: /
    username: admin
    password: admin
    port: 5672
    host: 127.0.0.1

II. 消费姿势

本文将目标放在实用性上,将结合具体的场景来演示@RabbitListener的使用姿势,因此当你发现看完本文之后这个注解里面有些属性还是不懂,请不要着急,下一篇会一一道来

0. mock数据

消费消费,没有数据,怎么消费呢?所以我们第一步,先创建一个消息生产者,可以往exchange写数据,供后续的消费者测试使用

本篇的消费主要以topic模式来进行说明(其他的几个模式使用差别不大,如果有需求的话,后续补齐)

@RestController
public class PublishRest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(path = "publish")
    public boolean publish(String exchange, String routing, String data) {
        rabbitTemplate.convertAndSend(exchange, routing, data);
        return true;
    }
}

提供一个简单rest接口,可以指定往哪个exchange推送数据,并制定路由键

1. case1: exchange, queue已存在

对于消费者而言其实是不需要管理exchange的创建/销毁的,它是由发送者定义的;一般来讲,消费者更关注的是自己的queue,包括定义queue并与exchange绑定,而这一套过程是可以直接通过rabbitmq的控制台操作的哦

所以实际开发过程中,exchange和queue以及对应的绑定关系已经存在的可能性是很高的,并不需要再代码中额外处理;

在这种场景下,消费数据,可以说非常非常简单了,如下:

/**
 * 当队列已经存在时,直接指定队列名的方式消费
 *
 * @param data
 */
@RabbitListener(queues = "topic.a")
public void consumerExistsQueue(String data) {
    System.out.println("consumerExistsQueue: " + data);
}

直接指定注解中的queues参数即可,参数值为对列名(queueName)

2. case2: queue不存在

当queue的autoDelete属性为false时,上面的使用场景还是比较合适了;但是,当这个属性为true时,没有消费者队列就会自动删除了,这个时候再用上面的姿势,可能会得到下面的异常

队列不存在

通常这种场景下,是需要我们来主动创建Queue,并建立与Exchange的绑定关系,下面给出@RabbitListener的推荐使用姿势

/**
 * 队列不存在时,需要创建一个队列,并且与exchange绑定
 */
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "topic.n1", durable = "false", autoDelete = "true"),
        exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), 
        key = "r"))
public void consumerNoQueue(String data) {
    System.out.println("consumerNoQueue: " + data);
}

一个注解,内部声明了队列,并建立绑定关系,就是这么神奇!!!

注意@QueueBinding注解的三个属性:

  • value: @Queue注解,用于声明队列,value为queueName, durable表示队列是否持久化, autoDelete表示没有消费者之后队列是否自动删除
  • exchange: @Exchange注解,用于声明exchange, type指定消息投递策略,我们这里用的topic方式
  • key: 在topic方式下,这个就是我们熟知的 routingKey

以上,就是在队列不存在时的使用姿势,看起来也不复杂

3. case3: ack

在前面rabbitmq的核心知识点学习过程中,会知道为了保证数据的一致性,有一个消息确认机制;

我们这里的ack主要是针对消费端而言,当我们希望更改默认ack方式(noack, auto, manual),可以如下处理

/**
 * 需要手动ack,但是不ack时
 *
 * @param data
 */
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n2", durable = "false", autoDelete = "true"),
        exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), ackMode = "MANUAL")
public void consumerNoAck(String data) {
    // 要求手动ack,这里不ack,会怎样?
    System.out.println("consumerNoAck: " + data);
}

上面的实现也比较简单,设置ackMode=MANUAL,手动ack

但是,请注意我们的实现中,没有任何一个地方体现了手动ack,这就相当于一致都没有ack,在后面的测试中,可以看出这种不ack时,会发现数据一直在unacked这一栏,当Unacked数量超过限制的时候,就不会再消费新的数据了

4. case4: manual ack

上面虽然选择ack方式,但是还缺一步ack的逻辑,接下来我们看一下如何补齐

/**
 * 手动ack
 *
 * @param data
 * @param deliveryTag
 * @param channel
 * @throws IOException
 */
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n3", durable = "false", autoDelete = "true"),
        exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), ackMode = "MANUAL")
public void consumerDoAck(String data, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
        throws IOException {
    System.out.println("consumerDoAck: " + data);

    if (data.contains("success")) {
        // RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费
        channel.basicAck(deliveryTag, false);
    } else {
        // 第三个参数true,表示这个消息会重新进入队列
        channel.basicNack(deliveryTag, false, true);
    }
}

请注意,方法多了两个参数

  • deliveryTag: 相当于消息的唯一标识,用于mq辨别是哪个消息被ack/nak了
  • channel: mq和consumer之间的管道,通过它来ack/nak

当我们正确消费时,通过调用 basicAck 方法即可

// RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费
channel.basicAck(deliveryTag, false);

当我们消费失败,需要将消息重新塞入队列,等待重新消费时,可以使用 basicNack

// 第三个参数true,表示这个消息会重新进入队列
channel.basicNack(deliveryTag, false, true);

5. case5: 并发消费

当消息很多,一个消费者吭哧吭哧的消费太慢,但是我的机器性能又杠杠的,这个时候我就希望并行消费,相当于同时有多个消费者来处理数据

要支持并行消费,如下设置即可

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n4", durable = "false", autoDelete = "true"),
        exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), concurrency = "4")
public void multiConsumer(String data) {
    System.out.println("multiConsumer: " + data);
}

请注意注解中的concurrency = "4"属性,表示固定4个消费者;

除了上面这种赋值方式之外,还有一种 m-n 的格式,表示m个并行消费者,最多可以有n个

(额外说明:这个参数的解释实在SimpleMessageListenerContainer的场景下的,下一篇文章会介绍它与DirectMessageListenerContainer的区别)

6. 测试

通过前面预留的消息发送接口,我们在浏览器中请求: http://localhost:8080/publish?exchange=topic.e&routing=r&data=wahaha

消费

然后看一下输出,五个消费者都接收到了,特别是主动nak的那个消费者,一直在接收到消息;

(因为一直打印日志,所以重启一下应用,开始下一个测试)

然后再发送一条成功的消息,验证下手动真确ack,是否还会出现上面的情况,请求命令: http://localhost:8080/publish?exchange=topic.e&routing=r&data=successMsg

然后再关注一下,没有ack的那个队列,一直有一个unack的消息

II. 其他

系列博文

项目源码

1. 一灰灰Bloghttps://liuyueyi.github.io/hexblog

一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

2. 声明

尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

3. 扫描关注

一灰灰blog

QrCode