【DB系列】借助redis来实现延时队列(应用篇)

文章目录
  1. I. 方案设计
    1. 1. redis过期时间
    2. 2. redis zset
  2. II. redis演示队列实现
    1. 1. 环境配置
    2. 2. 核心实现
    3. 3. 写入队列
    4. 4. 定时取演示队列消息
    5. 5. 消息消费
    6. 5. 测试demo
    7. 6. 小结
  3. III. 不能错过的源码和相关知识点
    1. 0. 项目
    2. 1. 微信公众号:一灰灰Blog

延时队列,相信各位小伙伴并不会陌生,jdk原生提供了延时队列的使用,当然我们这里介绍的不是这种;在实际的项目中,如果我们有延时队列的场景,可以怎样去实现呢

举一个简单的例子,如下单15分钟内,若没有支付,则自动取消订单

本文将介绍一种非常非常简单的实现方式

I. 方案设计

要实现15分钟后自动取消订单,这个也太简单了,来给出一段神级代码

1
2
3
4
5
new Thread(() -> {
// 休眠十五分钟,执行取消订单
Thread.sleep(15 * 60 * 1000);
cancelOrder();
}).start();

好的,本文就此结束(开玩笑….)

忽略上面的段子,接下来想一想,如果让我们来实现一个延时队列,可以怎么整?

  • 单机:
    • DelayQueue
    • 定时任务
  • 分布式:
    • Quartz定时任务
    • rabbitmq延时队列
    • redis zset
    • redis 过期回调
    • 时间轮

首先我们这里排除掉单机版,至于原因,现在单体单实例应用实在不多见了,直接来看多实例的情况吧

在上面的几种方案中,重心放在redis上,两种case,下面分别介绍一下

1. redis过期时间

我们知道,在使用redis做缓存时,可以设置失效时间,借助redis的失效事件,我们可以来实现延时队列的场景

比如,现在一个订单,我们在redis中新加入一个订单id,失效时间设置为15分钟;当支付成功之后,主动删除这个缓存;若一直没有付钱,则15分钟后,触发一个过期事件,然后订阅这个事件,来执行取消订单

上面这种实现,有两个问题

  • key失效监听,可能存在大量的无效信息
  • 广播方式消费事件,多实例接收到这个事件,怎么防并发?或者没有一个实例接收到这个事件,那么这个取消订单就会漏掉

显然上面的第二点,漏消息是不能接受的

2. redis zset

zset属于redis提供的几个基本数据结构中的一种,它的特点是有 value + score

如果我们想使用zset拉实现演示队列,那么一个可选的方案就是将score设置为触发的时间戳,value为业务值

然后写一个定时任务,不断的从zset中,取出score小于当前时间戳的数据,任务它们都是已经到期可以执行的

借助这个方案,可以相对简单的实现一个演示队列了

II. redis演示队列实现

1. 环境配置

接下来我们将以redis的zset来实现延时队列,本文借助SpringBoot来搭建一个演示工程,使用的基本配置如下

本项目借助SpringBoot 2.2.1.RELEASE + maven 3.5.3 + IDEA进行开发

核心依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- 下面这里两个非必须,主要是后面的实现演示使用到了 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>

redis使用默认的配置,本机 localhost + 6379

2. 核心实现

借助redis zset来实现延时队列,具体的实现代码就很简单了,无非是从zset中取出score小于当前时间戳的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static final Long DELETE_SUCCESS = 1L;
@Autowired
private StringRedisTemplate redisTemplate;

public String fetchOne(String key) {
Set<String> sets = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis(), 0, 3);
if (CollectionUtils.isEmpty(sets)) {
return null;
}

for (String val : sets) {
if (DELETE_SUCCESS.equals(redisTemplate.opsForZSet().remove(key, val))) {
// 删除成功,表示抢占到
return val;
}
}
return null;
}

注意上面的实现,有一个点需要说一下

zset:每次查询时取了三个数据,然后遍历获取到的数据,依次尝试去删除,若删除成功,则表示当前实例抢占到了这个消息

为什么这样设计?

这里有两个点,先解释第一个,为啥先查后删

如果我们按照正常的实现流程,每次从zset中取一个,但是无法保证这个时候就只有我一个人拿到了这个数据,在多实例的场景下,可能存在多个实例同时拿到了它,那么如何才能表示只有我一个人霸占了她呢(忽然进入言情的世界😓)

借助redis的单线程机制,只可能有一个实例会删除成功,所以拿到并删除成功的那个小伙伴,就是最终的幸运儿;

因此实现细节就是先查,后删,若删除成功,表示获取成功;否则表示被其他的实例捷足先登

接下来再看第二个,为啥一次拿三个

从上面的分析可以看出,如果我一次只拿一个,那么我抢占到的几率并不太大,特别是当实例比较多时,可能会做多次的无效操作;为了减少这个可能性,所以我一次多拿几个做备选,这样抢占到的概率就会高一些,至于为什么是3,这个就看实际的实例与定时任务的执行间隔了

3. 写入队列

上面是从队列中拿数据,有拿当然得有写,所以我们简单的封装一下写入队列的case

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class RedisDelayListWrapper implements ApplicationContextAware {
private static final Long DELETE_SUCCESS = 1L;

private Set<String> topic = new CopyOnWriteArraySet<>();

public void publish(String key, Object val, long delayTime) {
topic.add(key);
String strVal = val instanceof String ? (String) val : JSONObject.toJSONString(val);

redisTemplate.opsForZSet().add(key, strVal, System.currentTimeMillis() + delayTime);
}
}

4. 定时取演示队列消息

接下来就是一个定时任务,不断的调用上面的实现,从zset中获取到期的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Scheduled(fixedRate = 10_000)
public void schedule() {
for (String specialTopic : topic) {
String cell = fetchOne(specialTopic);
if (cell != null) {
applicationContext.publishEvent(new DelayMsg(this, cell, specialTopic));
}
}
}

@ToString
public static class DelayMsg extends ApplicationEvent {
@Getter
private String msg;
@Getter
private String topic;

public DelayMsg(Object source, String msg, String topic) {
super(source);
this.msg = msg;
this.topic = topic;
}
}

上面的定时任务,直接借助Spring的@Schedule来实现,遍历所有的topic,捞出数据之后,通过spring的 event/listener事件机制来实现消息处理的解耦

5. 消息消费

最终就是我们的消息消费逻辑了,主要就是消费前面抛出的DelayMsg,我们这里借助AOP来实现消息过滤

定义一个注解Consumer,用来指定消费哪个topic

1
2
3
4
5
6
7
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface Consumer {
String topic();
}

注意这个注解上面还有 @EventListener,表明它可以监听的spring的事件

aop拦截逻辑,根据topic进行过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Aspect
@Component
public class ConsumerAspect {

@Around("@annotation(consumer)")
public Object around(ProceedingJoinPoint joinPoint, Consumer consumer) throws Throwable {
Object[] args = joinPoint.getArgs();
boolean check = false;
for (Object obj : args) {
if (obj instanceof RedisDelayListWrapper.DelayMsg) {
check = consumer.topic().equals(((RedisDelayListWrapper.DelayMsg) obj).getTopic());
}
}

if (!check) {
// 不满足条件,直接忽略
return null;
}

// topic匹配成功,执行
return joinPoint.proceed();
}
}

5. 测试demo

最后写一个测试demo,验证下上面的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@EnableScheduling
@RestController
@SpringBootApplication
public class Application {
private static final String TEST_DELAY_QUEUE = "test";
private static final String DEMO_DELAY_QUEUE = "demo";
@Autowired
private RedisDelayListWrapper redisDelayListWrapper;

private Random random = new Random();

public static void main(String[] args) {
SpringApplication.run(Application.class);
}

@GetMapping(path = "publish")
public String publish(String msg, Long delayTime) {
if (delayTime == null) {
delayTime = 10_000L;
}

String queue = random.nextBoolean() ? TEST_DELAY_QUEUE : DEMO_DELAY_QUEUE;
msg = queue + "#" + msg + "#" + (System.currentTimeMillis() + delayTime);
redisDelayListWrapper.publish(queue, msg, delayTime);
System.out.println("延时: " + delayTime + "ms后消费: " + msg + " now:" + LocalDateTime.now());
return "success!";
}


@Consumer(topic = TEST_DELAY_QUEUE)
public void consumer(RedisDelayListWrapper.DelayMsg delayMsg) {
System.out.println("TEST消费延时消息: " + delayMsg + " at:" + System.currentTimeMillis());
}

@Consumer(topic = DEMO_DELAY_QUEUE)
public void consumerDemo(RedisDelayListWrapper.DelayMsg delayMsg) {
System.out.println("DEMO消费延时消息: " + delayMsg + " at:" + System.currentTimeMillis());
}
}

6. 小结

本文属于一个实战小技巧,借助redis的zset来灵活的实现一个简单的延时队列,实现倒是没有太大的难度,其中的一些小细节还是挺有意思的,好的,今天分享到此over,欢迎各位老铁来撩,公众号 一灰灰blog 你值得拥有

III. 不能错过的源码和相关知识点

0. 项目

1. 微信公众号:一灰灰Blog

尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

下面一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

一灰灰blog


打赏 如果觉得我的文章对您有帮助,请随意打赏。
分享到