RabbitMQ基础教程之Spring使用篇

相关博文,推荐查看:

  1. RabbitMq基础教程之安装与测试
  2. RabbitMq基础教程之基本概念
  3. RabbitMQ基础教程之基本使用篇
  4. RabbitMQ基础教程之使用进阶篇
  5. RabbitMQ基础教程之Spring&JavaConfig使用篇

在前面的一篇演示了如何使用Spring来进行RabbitMQ的消息投递和消费,虽然可以实现基本的需求场景,但是使用起来却并不是特别顺手,首先是不同的消费者,得添加好多不同的配置项,加上有较多的配置(QueueName, ExchangeName, RoutingKey, autoAck…)

那么有没有可能借助工厂方式,来简化消费者这边的大多数配置呢?

I. 工厂类定义消费者信息

目标比较清晰了,我们希望有一个工厂类,可以承载所有的关心的配置信息,然后在实际使用的地方,通过这个工厂类生成一个Consumer即可

1. 消费接口定义

首先需要定义一个公共的消费者接口,主要用来接收并处理消息

1
2
3
4
5
6
7
public interface IMqConsumer extends ChannelAwareMessageListener {

void setContainer(SimpleMessageListenerContainer container);

default void shutdown() {}

}

对于ChannelAwareMessageListener前面就以及用到,当有消息后,触发的监听器,这里我们增加了两个方法,其实主要就是干一件事情,优雅的关闭消费

当应用需要停止或者重启时,我们希望先优雅的关闭消息消费,那么就会用到 org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#stop()

因此针对这个功能,可以实现一个公共的抽象类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public abstract class AbsMQConsumer implements IMqConsumer {
private volatile boolean end = false;
private SimpleMessageListenerContainer container;
private boolean autoAck;

public void setContainer(SimpleMessageListenerContainer container) {
this.container = container;
autoAck = container.getAcknowledgeMode().isAutoAck();
}

public void shutdown() {
end = true;
}

protected void autoAck(Message message, Channel channel, boolean success) throws IOException {
if (autoAck) {
return;
}

if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}

public void onMessage(Message message, Channel channel) throws Exception {
try {
autoAck(message, channel, process(message, channel));
} catch (Exception e) {
autoAck(message, channel, false);
throw e;
} finally {
if (end) {
container.stop();
}
}
}

public abstract boolean process(Message message, Channel channel);
}

上面的实现中,前面两个方法比较清晰,没有什么二意,需要关注的是onMessage方法的实现,我们默认封装了ack的逻辑,设计思路如下:

  • 当开启了手动ack之后,要求实际消费方实现 process 方法,并返回boolean,表示是否消费成功
    • 消费成功,则ack
    • 消费失败,则将消息重新丢回到队列
  • 若开启自动ack,则不需要关注
  • 每次消费一条消息之后,需要关注下是否关闭这个状态,从而实现mq的停止消费

所以每个实际消费者,实现这个抽象类的 process 方法即可,在内部实现自己的消息消费逻辑

2. 工厂类

前面主要定义了消费的实体可以怎么玩,接下来就是重头戏了,如何声明队列,如何绑定交换器等,如何注册消息监听器(即上面的Consumer)?

根据前面的实现,我们需要关注的几个参数依然是下面几个:

1
2
3
4
5
6
7
8
9
10
11
12
private String exchange;

private String queue;
private String routingKey;


private Boolean autoDeleted;
private Boolean durable;
private Boolean autoAck;

private ConnectionFactory connectionFactory;
private RabbitAdmin rabbitAdmin;

我们最终的目标就是给每个Consumer创建一个SimpleMessageListenerContainer的Bean交给Spring来托管,所以可以利用Spring的FactoryBean来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
@Data
@Builder
public class MQContainerFactory implements FactoryBean<SimpleMessageListenerContainer> {
private ExchangeType exchangeType;

private String directExchange;
private String topicExchange;
private String fanoutExchange;

private String queue;
private String routingKey;


private Boolean autoDeleted;
private Boolean durable;
private Boolean autoAck;

// 并发数
private Integer concurrentNum;

private ConnectionFactory connectionFactory;
private RabbitAdmin rabbitAdmin;


// 消费方
private IMqConsumer consumer;


private Exchange buildExchange() {
if (directExchange != null) {
exchangeType = ExchangeType.DIRECT;
return new DirectExchange(directExchange);
} else if (topicExchange != null) {
exchangeType = ExchangeType.TOPIC;
return new TopicExchange(topicExchange);
} else if (fanoutExchange != null) {
exchangeType = ExchangeType.FANOUT;
return new FanoutExchange(fanoutExchange);
} else {
if (StringUtils.isEmpty(routingKey)) {
throw new IllegalArgumentException("defaultExchange's routingKey should not be null!");
}
exchangeType = ExchangeType.DEFAULT;
return new DirectExchange("");
}
}


private Queue buildQueue() {
if (StringUtils.isEmpty(queue)) {
throw new IllegalArgumentException("queue name should not be null!");
}

return new Queue(queue, durable == null ? false : durable, false, autoDeleted == null ? true : autoDeleted);
}


private Binding bind(Queue queue, Exchange exchange) {
return exchangeType.binding(queue, exchange, routingKey);
}


private void check() {
if (rabbitAdmin == null || connectionFactory == null) {
throw new IllegalArgumentException("rabbitAdmin and connectionFactory should not be null!");
}

if (consumer == null) {
throw new IllegalArgumentException("rabbit msg consumer should not be null!");
}
}


@Override
public SimpleMessageListenerContainer getObject() throws Exception {
check();

Queue queue = buildQueue();
Exchange exchange = buildExchange();
Binding binding = bind(queue, exchange);

rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(binding);


SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setRabbitAdmin(rabbitAdmin);
container.setConnectionFactory(connectionFactory);
container.setQueues(queue);
container.setPrefetchCount(20);
// 这里表示支持Consumer并发消费
container.setConcurrentConsumers(concurrentNum == null ? 1 : concurrentNum);
container.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);
container.setMessageListener(consumer);


consumer.setContainer(container);
return container;
}

@Override
public Class<?> getObjectType() {
return SimpleMessageListenerContainer.class;
}
}

具体的实现代码如上,接下来进行分块分析,首先是Exchange, 我们直到常用的有三种 Exchange:

  • DirectExchange
  • TopicExchange
  • FanoutExchange

因此,我们自定义了一个枚举,来实现不同的Exchange的绑定姿势,注意下面的实现姿势,利用了抽象类的思路

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public enum ExchangeType {
DIRECT {
@Override
public Binding binding(Queue queue, Exchange exchange, String routingKey) {
return BindingBuilder.bind(queue).to((DirectExchange) exchange).with(routingKey);
}
}, TOPIC {
@Override
public Binding binding(Queue queue, Exchange exchange, String routingKey) {
return BindingBuilder.bind(queue).to((TopicExchange) exchange).with(routingKey);
}
}, FANOUT {
@Override
public Binding binding(Queue queue, Exchange exchange, String routingKey) {
return BindingBuilder.bind(queue).to((FanoutExchange) exchange);
}
}, DEFAULT {
@Override
public Binding binding(Queue queue, Exchange exchange, String routingKey) {
// 对于Default而言,只能讲消息路由到名完全一直的queue上
return BindingBuilder.bind(queue).to((DirectExchange) exchange).with(queue.getName());
}
};

public abstract Binding binding(Queue queue, Exchange exchange, String routingKey);
}

剩下的就是 com.git.hui.rabbit.spring.component.MQContainerFactory#getObject 的逻辑了,基本上和前面的思路一样

  • 定义queue
  • 定义exchange
  • 创建绑定
  • 创建SimpleMessageListenerContainer,设置各种参数

3. 配置类

不可避免的需要一些配置,如何RabbitMQ的连接工厂,RabbitAmdin,这些是可以作为多个Consumer的公共Bean来使用的,因此就放在了配置类中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class FacSpringConfig {

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
return factory;
}

@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}

II. 测试验证

从代码实现角度来看,就几个类,还是比较简单的,接下来就看实际使用的姿势,是不是变简单了

新建一个消费类

1
2
3
4
5
6
7
8
public class FacMQConsumer extends AbsMQConsumer {
@Override
public boolean process(Message message, Channel channel) {
String data = new String(message.getBody());
System.out.println(" fac mq consumer: " + data);
return true;
}
}

然后定义这个消费类的配置信息,主要是两个Bean的定义,一个是定义上面的FactoryBean,内部通过Builder模式设置了各种参数(借助lombok实现);另外一个就是获取SimpleMessageListenerContainer容器了

1
2
3
4
5
6
7
8
9
10
11
12
13
@Bean
public MQContainerFactory mqContainerFactory(ConnectionFactory connectionFactory, RabbitAdmin rabbitAdmin) {
return MQContainerFactory.builder().queue("fac.direct").directExchange("fac.direct.exchange").durable(true)
.autoDeleted(false).autoAck(false).connectionFactory(connectionFactory).rabbitAdmin(rabbitAdmin)
.routingKey("fac-routing").consumer(new FacMQConsumer()).build();
}

@Bean
public SimpleMessageListenerContainer facContainer(ConnectionFactory connectionFactory, RabbitAdmin rabbitAdmin)
throws Exception {
MQContainerFactory fac = mqContainerFactory(connectionFactory, rabbitAdmin);
return fac.getObject();
}

对应的测试类可以如下实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = FacSpringConfig.class)
public class FactoryComponentUnit {

@Autowired
private AmqpProducer amqpProducer;

@Test
public void testDirectConsumer() throws InterruptedException {
String[] routingKey = new String[]{"hello.world", "fac-routing", "test1"};
for (int i = 0; i < 10; i++) {
amqpProducer.publishMsg("fac.direct.exchange", routingKey[i % 3],
">>> hello " + routingKey[i % 3] + ">>> " + i);
}
System.out.println("-------over---------");

Thread.sleep(1000 * 60 * 10);
}
}

然后就可以愉快的玩耍了

III. 其他

项目地址

一灰灰Bloghttps://liuyueyi.github.io/hexblog

一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

声明

尽信书则不如,已上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

扫描关注

QrCode